In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [0]:
%run ../utils/pyutils

In [0]:
container_name = "raw"
ds = get_wasbs_path(container = "raw")

In [0]:
schema = StructType([
    StructField("booliId", StringType(), True),
    StructField("streetAddress", StringType(), True),
    StructField("constructionYear", StringType(), True),
    StructField("objectType", StringType(), True),
    StructField("descriptiveAreaName", StringType(), True),
    StructField("soldPriceType", StringType(), True),
    StructField("daysActive", StringType(), True),
    StructField("soldDate", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("url", StringType(), True),
    StructField("__typename", StringType(), True),
    StructField("soldPrice.raw", StringType(), True),
    StructField("rent.raw", StringType(), True),
    StructField("floor.raw", StringType(), True),
    StructField("soldSqmPrice.raw", StringType(), True),
    StructField("soldPriceAbsoluteDiff.raw", StringType(), True),
    StructField("soldPricePercentageDiff.raw", StringType(), True),
    StructField("listPrice.raw", StringType(), True),
    StructField("livingArea.raw", StringType(), True),
    StructField("rooms.raw", StringType(), True)
])

In [0]:
rename_dict = {
    "soldPrice.raw": "soldPrice",
    "rent.raw": "rent",
    "floor.raw": "floor",
    "soldSqmPrice.raw": "soldSqmPrice",
    "soldPriceAbsoluteDiff.raw": "soldPriceAbsoluteDiff",
    "soldPricePercentageDiff.raw": "soldPricePercentageDiff",
    "listPrice.raw": "listPrice",
    "livingArea.raw": "livingArea",
    "rooms.raw": "rooms",
    "__typename": "typeName"
}

In [0]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .load(f"{ds}/sold/all")

In [0]:
for old_name, new_name in rename_dict.items():
    try:
        df = df.withColumnRenamed(old_name, new_name)
        print(f"Renamed column {old_name} to {new_name}")
    except Exception as e:
        print(f"Failed to rename column {old_name} to {new_name}: {e}")


In [0]:
df.createOrReplaceTempView("sold_raw")

In [0]:
%sql
select
    booliId,
    streetAddress,
    constructionYear,
    objectType,
    descriptiveAreaName,
    soldPriceType,
    daysActive,
    soldDate,
    latitude,
    longitude,
    url,
    typeName,
    soldPrice,
    rent,
    floor,
    soldSqmPrice,
    soldPriceAbsoluteDiff,
    soldPricePercentageDiff,
    listPrice,
    livingArea,
    rooms
from 
    sold_raw

In [0]:
%sql
select distinct
  descriptiveAreaName
from 
  sold_raw