In [1]:
import polars as pl
import pendulum

In [2]:
def _from_source(_df):
    print("start: _from_source()")
    # Read and concatenate files
    
    # Sale year / month
    _df = _df.with_columns([
        pl.col("month").str.split("-").list.get(0).cast(pl.Int64).alias("x0a_sale_year"),
        pl.col("month").str.split("-").list.get(1).cast(pl.Int64).alias("x0b_sale_month")
    ])

    # Address and Town
    _df = _df.with_columns([
        (pl.col("block") + " " + pl.col("street_name")).alias("x1a_address"),
        ("B-" + pl.col("block")).alias("x1b_block"),
        pl.col("street_name").alias("x1c_road"),
        pl.col("town").alias("x1d_town")
    ])

    # Storey
    _df = _df.with_columns([
        pl.col("storey_range").str.split(by=" ").list.get(0).cast(pl.Int64).alias("x3a_storey_min"),
        pl.col("storey_range").str.split(by=" ").list.get(2).cast(pl.Int64).alias("x3b_storey_max")
    ])

    # Lease
    _df = _df.with_columns([
        pl.col("lease_commence_date").cast(pl.Int64).alias("x4b_lease_commence_year")
    ])

    # Area and Flat Type
    _df = _df.with_columns([
        (pl.col("floor_area_sqm") * 10.7639).cast(pl.Int64).alias("x5a_area_sqft"),
        pl.col("floor_area_sqm").cast(pl.Int64).alias("x5b_area_sqm"),
        pl.col("flat_type").alias("x5c_flat_type")
    ])

    # Price and PSF
    _df = _df.with_columns([
        pl.col("resale_price").cast(pl.Int64).alias("y1a_price"),
        ((pl.col("resale_price") / pl.col("x5a_area_sqft") * 100).round(2)).alias("y1b2_psf")
    ])
    _df = _df.with_columns([
        (pl.col("y1b2_psf").cast(pl.Int64)/100).alias("y1b_psf"),
    ])

    # Drop original columns
    _df = _df.select([
        "x0a_sale_year", "x0b_sale_month",
        "x1a_address", "x1b_block", "x1c_road", "x1d_town",
        "x3a_storey_min", "x3b_storey_max",
        "x4b_lease_commence_year",
        "x5a_area_sqft", "x5b_area_sqm", "x5c_flat_type",
        "y1a_price", "y1b_psf"
    ])

    print(f"out__df.shape={_df.shape}")
    print("end: _from_source()")
    return _df

In [3]:
def _from_ingested(_df):
    TOKENS_SUBSTITUTE = {
        "AVE": "AVENUE",
        "BT": "BUKIT",
        "C'WEALTH": "COMMONWEALTH",
        "CL": "CLOSE",
        "CRES": "CRESCENT",
        "CTRL": "CENTRAL",
        "DR": "DRIVE",
        "GDNS": "GARDENS",
        "JLN": "JALAN",
        "KG": "KAMPUNG",
        "LOR": "LORONG",
        "LOR": "LORONG",
        "NTH": "NORTH",
        "PK": "PARK",
        "PL": "PLACE",
        "RD": "ROAD",
        "ST": "STREET",
        "ST.": "SAINT",
        "STH": "SOUTH",
        "TER": "TERRACE",
        "TG": "TANJONG",
        "UPP": "UPPER",
    }

    def road(x):
        x_list = x.split()
        x_list_out = [TOKENS_SUBSTITUTE.get(t, t) for t in x_list]
        return " ".join(x_list_out)

    print("start: _from_ingested()")

    # Create address by combining block number (without 'B-' prefix) and road
    out_df = _df.with_columns([
        pl.concat_str([
            pl.col("x1b_block").str.slice(2),
            pl.lit(" "),
            pl.col("x1c_road")
        ]).alias("address")
    ])

    # Apply road substitutions
    out_df = out_df.with_columns([
        pl.col("address").map_elements(road, return_dtype=pl.Utf8).alias("address"),
        pl.col("x1c_road").map_elements(road, return_dtype=pl.Utf8).alias("road"),
        pl.col("x1d_town").alias("town")
    ])

    # Rename columns
    out_df = out_df.rename({
        "x0a_sale_year": "year_of_sale",
        "x0b_sale_month": "month_of_sale",
        "x3a_storey_min": "min_storey",
        "x3b_storey_max": "max_storey",
        "x4b_lease_commence_year": "built_year",
        "x5a_area_sqft": "sqft",
        "x5b_area_sqm": "sqm",
        "x5c_flat_type": "flat_type",
        "y1a_price": "price",
        "y1b_psf": "psf",
    })

    # Select final columns
    out_df = out_df.select([
        "year_of_sale",
        "month_of_sale",
        "min_storey",
        "max_storey",
        "built_year",
        "sqft",
        "sqm",
        "flat_type",
        "price",
        "psf",
        "address",
        "road",
        "town",
    ])

    print(f"out_df.shape={out_df.shape}")
    print("end: _from_ingested()")
    return out_df

In [4]:
def _from_preprocessed(_df, _bands_df):
    print("start: _from_preprocessed()")
    
    # Rename columns
    dfhdb = _df.rename({
        "min_storey": "minimum_floor",
        "max_storey": "maximum_floor",
    })

    # Calculate lease remaining
    now = pendulum.now()
    thisyear = now.year
    thismonth = now.month
    dfhdb = dfhdb.with_columns([
        (99 - thisyear + pl.col("built_year")).alias("lease_remaining")
    ])
    dfhdb = dfhdb.with_columns([
        ((thisyear - pl.col("year_of_sale")) * 12 + 
         (thismonth - pl.col("month_of_sale"))).alias("months_ago")
    ])
    # Select and reorder columns
    dfhdb = dfhdb.select([
        "address",
        "lease_remaining",
        "minimum_floor",
        "maximum_floor",
        "built_year",
        "sqft",
        "sqm",
        "flat_type",
        "road",
        "town",
        "year_of_sale",
        "month_of_sale",
        "months_ago",
        "price",
        "psf",
    ])

    # Join with bands dataframe
    dfhdb = dfhdb.join(
        _bands_df,
        on="lease_remaining",
        how="left"
    )
    dfhdb = dfhdb.with_columns(
    (1_000_000 <= pl.col("price")).cast(pl.Int8).alias("price_is_geq_one_million")
    )

    print(f"out_df.shape={dfhdb.shape}")
    print("end: _from_preprocessed()")
    return dfhdb

In [5]:
df = pl.read_csv(
    "Resale Flat Prices (Based on Registration Date), From Jan 2015 to Dec 2016.csv",
    schema_overrides={"resale_price": pl.Float32, "floor_area_sqm" : pl.Float32}
)
bands_df = pl.read_csv(
    "bands.csv"
)
df = _from_source(df)
df = _from_ingested(df)
df = _from_preprocessed(df, bands_df)

start: _from_source()
out__df.shape=(37153, 14)
end: _from_source()
start: _from_ingested()
out_df.shape=(37153, 13)
end: _from_ingested()
start: _from_preprocessed()
out_df.shape=(37153, 17)
end: _from_preprocessed()


In [6]:
df.write_csv("analysis/ingested_2015.csv")