```python
>>> df["Latitude"].min()
36.974922
>>> df["Latitude"].max()
37.558388
>>> df["Longitude"].min()
-122.17364
>>> df["Longitude"].max()
-121.54903
```

In [1]:
import polars as pl
import os

In [2]:
RAW_DATA_FOLDER = "raw_data"
STAGING_DATA_FOLDER = "staging_data"
ELEMENTS = ["TMAX", "TMIN", "PRCP"]

In [3]:
# we got this file from previous semester DB project.
# NCDC CDO (Climate Data Online) website also provides a REST API to extract the same data.
# filtering the lat and long which is within the bay area bounds.
# filter only stations ids under bay area
stations = (
    pl.scan_csv(os.path.join(RAW_DATA_FOLDER, "stations.csv"))
    .select(["id", "latitude", "longitude"])
    .with_columns(
        pl.col("latitude").cast(pl.Float32), pl.col("longitude").cast(pl.Float32)
    )
    .filter((pl.col("latitude").ge(36.97)) & (pl.col("latitude").le(37.56)))
    .filter((pl.col("longitude").ge(-122.18)) & (pl.col("longitude").le(-121.54)))
)

In [4]:
climate = (
    pl.scan_csv(os.path.join(RAW_DATA_FOLDER, "2014.csv"))
    .select([pl.col("ID"), pl.col("DATE"), pl.col("ELEMENT"), pl.col("DATA_VALUE")])
    .with_columns(pl.col("DATE").cast(pl.Utf8).str.strptime(pl.Date, "%Y%m%d"))
    .with_columns((pl.col("DATA_VALUE") / 10).cast(pl.Float32))
    .filter(pl.col("ELEMENT").is_in(ELEMENTS))
    .filter(pl.col("ID").is_in(stations.select("id").collect()))
    .unique(["ID", "DATE", "ELEMENT"])
    .group_by(["ID", "DATE"])  # doing a pivot table
    .agg(
        pl.col("DATA_VALUE")
        .filter(pl.col("ELEMENT").eq(element)) # making new columns tmax, tmin, prcp to add its data_values to it since element is categorical
        .alias(element)
        .mean()
        for element in ELEMENTS
    )
)

example:

| id    |   date        |   element |   datavalue   |
| ----- | ------------- | --------- | ------------- |
| 1     |   2014/1/1    |   tmax    |   30.0        |
| 1     |   2014/1/1    |   tmin    |   12          |

post group by and aggregate:

| id    |   date        |   tmax    |   tmin    |   prcp    |
| ----- | ------------- | --------- | --------- | ----------|
| 1     |   2014/1/1    |   30.0    |   12      |   N/A     |

In [5]:
print(climate.count().collect())

shape: (1, 5)
┌───────┬───────┬──────┬──────┬───────┐
│ ID    ┆ DATE  ┆ TMAX ┆ TMIN ┆ PRCP  │
│ ---   ┆ ---   ┆ ---  ┆ ---  ┆ ---   │
│ u32   ┆ u32   ┆ u32  ┆ u32  ┆ u32   │
╞═══════╪═══════╪══════╪══════╪═══════╡
│ 15717 ┆ 15717 ┆ 6085 ┆ 6085 ┆ 13165 │
└───────┴───────┴──────┴──────┴───────┘


In [6]:
lf = (
    pl.scan_csv(os.path.join(RAW_DATA_FOLDER, "2015.csv"))
    .select([pl.col("ID"), pl.col("DATE"), pl.col("ELEMENT"), pl.col("DATA_VALUE")])
    .with_columns(pl.col("DATE").cast(pl.Utf8).str.strptime(pl.Date, "%Y%m%d"))
    .with_columns((pl.col("DATA_VALUE") / 10).cast(pl.Float32))
    .filter(pl.col("ELEMENT").is_in(ELEMENTS))
    .filter(pl.col("ID").is_in(stations.select("id").collect()))
    .unique(["ID", "DATE", "ELEMENT"])
    .group_by(["ID", "DATE"])
    .agg(
        pl.col("DATA_VALUE")
        .filter(pl.col("ELEMENT").eq(element))
        .alias(element)
        .mean()
        for element in ELEMENTS
    )
)
climate = pl.concat([climate, lf], how="vertical")

In [7]:
print(climate.count().collect())

shape: (1, 5)
┌───────┬───────┬───────┬───────┬───────┐
│ ID    ┆ DATE  ┆ TMAX  ┆ TMIN  ┆ PRCP  │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32   ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪═══════╪═══════╪═══════╪═══════╡
│ 31838 ┆ 31838 ┆ 12003 ┆ 11989 ┆ 26724 │
└───────┴───────┴───────┴───────┴───────┘


In [8]:
lf = (
    pl.scan_csv(os.path.join(RAW_DATA_FOLDER, "2016.csv"))
    .select([pl.col("ID"), pl.col("DATE"), pl.col("ELEMENT"), pl.col("DATA_VALUE")])
    .with_columns(pl.col("DATE").cast(pl.Utf8).str.strptime(pl.Date, "%Y%m%d"))
    .with_columns((pl.col("DATA_VALUE") / 10).cast(pl.Float32))
    .filter(pl.col("ELEMENT").is_in(ELEMENTS))
    .filter(pl.col("ID").is_in(stations.select("id").collect()))
    .unique(["ID", "DATE", "ELEMENT"])
    .group_by(["ID", "DATE"])
    .agg(
        pl.col("DATA_VALUE")
        .filter(pl.col("ELEMENT").eq(element))
        .alias(element)
        .mean()
        for element in ELEMENTS
    )
)
climate = pl.concat([climate, lf], how="vertical")

In [9]:
print(climate.count().collect())

shape: (1, 5)
┌───────┬───────┬───────┬───────┬───────┐
│ ID    ┆ DATE  ┆ TMAX  ┆ TMIN  ┆ PRCP  │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32   ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪═══════╪═══════╪═══════╪═══════╡
│ 47861 ┆ 47861 ┆ 18005 ┆ 17969 ┆ 40193 │
└───────┴───────┴───────┴───────┴───────┘


In [10]:
lf = (
    pl.scan_csv(os.path.join(RAW_DATA_FOLDER, "2017.csv"))
    .select([pl.col("ID"), pl.col("DATE"), pl.col("ELEMENT"), pl.col("DATA_VALUE")])
    .with_columns(pl.col("DATE").cast(pl.Utf8).str.strptime(pl.Date, "%Y%m%d"))
    .with_columns((pl.col("DATA_VALUE") / 10).cast(pl.Float32))
    .filter(pl.col("ELEMENT").is_in(ELEMENTS))
    .filter(pl.col("ID").is_in(stations.select("id").collect()))
    .unique(["ID", "DATE", "ELEMENT"])
    .group_by(["ID", "DATE"])
    .agg(
        pl.col("DATA_VALUE")
        .filter(pl.col("ELEMENT").eq(element))
        .alias(element)
        .mean()
        for element in ELEMENTS
    )
)
climate = pl.concat([climate, lf], how="vertical")

In [11]:
print(climate.count().collect())

shape: (1, 5)
┌───────┬───────┬───────┬───────┬───────┐
│ ID    ┆ DATE  ┆ TMAX  ┆ TMIN  ┆ PRCP  │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32   ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪═══════╪═══════╪═══════╪═══════╡
│ 63667 ┆ 63667 ┆ 23613 ┆ 23568 ┆ 53419 │
└───────┴───────┴───────┴───────┴───────┘


In [12]:
# Select the weather stations which have provided tmax data for more than 95% of the days so that the staions that are giving null vales in 
# element field will be removed.
climate95 = (
    climate.group_by("ID")
    .agg(
        [
            pl.col("DATE").len(),
            pl.col("TMAX").null_count(),
            pl.col("TMIN").null_count(),
            pl.col("PRCP").null_count(),
        ]
    )
    .filter((pl.col("PRCP") / pl.col("DATE")) < 0.05)
    .filter((pl.col("TMAX") / pl.col("DATE")) < 0.05)
    .select("ID")
    .collect()
)
climate = climate.filter(pl.col("ID").is_in(climate95))

In [13]:
print(climate.count().collect())

shape: (1, 5)
┌───────┬───────┬───────┬───────┬───────┐
│ ID    ┆ DATE  ┆ TMAX  ┆ TMIN  ┆ PRCP  │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32   ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪═══════╪═══════╪═══════╪═══════╡
│ 13448 ┆ 13448 ┆ 13403 ┆ 13358 ┆ 13410 │
└───────┴───────┴───────┴───────┴───────┘


In [14]:
dates = (
    pl.scan_csv(os.path.join(STAGING_DATA_FOLDER, "dates.csv"))
    .with_columns(
        pl.col("id").cast(pl.Utf8).str.strptime(pl.Date, "%Y%m%d").alias("DATE")  # parsing date here
    )
    .select(pl.col("DATE"))
)

In [15]:
# Getting the unique station ids and cross joining or applying cartisian product on dates
unique_stations = climate.select("ID").unique()
unique_stations = unique_stations.join(dates, on="DATE", how="cross")

# left join climate and unique station on columns id and date. just in case if we missed any dates in climate df
climate = unique_stations.join(climate, on=["ID", "DATE"], how="left")

In [16]:
print(climate.count().collect())
# Actual no. of dates for each station available

shape: (1, 5)
┌───────┬───────┬───────┬───────┬───────┐
│ ID    ┆ DATE  ┆ TMAX  ┆ TMIN  ┆ PRCP  │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32   ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪═══════╪═══════╪═══════╪═══════╡
│ 14610 ┆ 14610 ┆ 13403 ┆ 13358 ┆ 13410 │
└───────┴───────┴───────┴───────┴───────┘


In [17]:
def impute_median(df, column_name):
    # Calculate median per group
    median_df = df.group_by(["ID", "YEAR", "MONTH"]).agg(
        pl.col(column_name).median().alias("median_" + column_name)
    )
    # Join back to the original data to align the medians
    df = df.join(median_df, on=["ID", "YEAR", "MONTH"])
    # Impute the nulls with the corresponding median
    df = df.with_columns(
        pl.when(pl.col(column_name).is_null()) # if element value is null in original data
        .then(pl.col("median_" + column_name)) # impute with median element value for that station for that year and month
        .otherwise(pl.col(column_name)) # if not null preserve the original value
        .alias(column_name)
    )
    # Drop the median column after imputation
    df = df.drop("median_" + column_name)
    return df

In [18]:
climate = climate.with_columns(
    pl.col("DATE").dt.year().alias("YEAR"), pl.col("DATE").dt.month().alias("MONTH")
)
climate = climate.pipe(impute_median, "TMAX")
climate = climate.pipe(impute_median, "TMIN")
climate = climate.pipe(impute_median, "PRCP")
climate = climate.drop(["YEAR", "MONTH"])
# pipe is a function that will apply a user function in our case impute median to all the rows of a data frame one by one

In [19]:
print(climate.count().collect())

shape: (1, 5)
┌───────┬───────┬───────┬───────┬───────┐
│ ID    ┆ DATE  ┆ TMAX  ┆ TMIN  ┆ PRCP  │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32   ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪═══════╪═══════╪═══════╪═══════╡
│ 14610 ┆ 14610 ┆ 14338 ┆ 14338 ┆ 14338 │
└───────┴───────┴───────┴───────┴───────┘


In [20]:
# # Filling the rest null values with median because median is more insensitive to outliers
climate = climate.with_columns(
    pl.col("TMAX").fill_null(pl.col("TMAX").median()),
    pl.col("TMIN").fill_null(pl.col("TMIN").median()),
    pl.col("PRCP").fill_null(0.0),
)

In [21]:
print(climate.count().collect())

shape: (1, 5)
┌───────┬───────┬───────┬───────┬───────┐
│ ID    ┆ DATE  ┆ TMAX  ┆ TMIN  ┆ PRCP  │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32   ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪═══════╪═══════╪═══════╪═══════╡
│ 14610 ┆ 14610 ┆ 14610 ┆ 14610 ┆ 14610 │
└───────┴───────┴───────┴───────┴───────┘


In [22]:
# joning stations and climate to get latitude and longitude
climate = climate.join(stations, how="left", left_on="ID", right_on="id").select(
    [
        pl.col("DATE").alias("Date"),
        pl.col("latitude").alias("Latitude"),
        pl.col("longitude").alias("Longitude"),
        pl.col("TMAX").alias("Tmax"),
        pl.col("TMIN").alias("Tmin"),
        pl.col("PRCP").alias("Prcp"),
    ]
)

In [23]:
print(climate.count().collect())

shape: (1, 6)
┌───────┬──────────┬───────────┬───────┬───────┬───────┐
│ Date  ┆ Latitude ┆ Longitude ┆ Tmax  ┆ Tmin  ┆ Prcp  │
│ ---   ┆ ---      ┆ ---       ┆ ---   ┆ ---   ┆ ---   │
│ u32   ┆ u32      ┆ u32       ┆ u32   ┆ u32   ┆ u32   │
╞═══════╪══════════╪═══════════╪═══════╪═══════╪═══════╡
│ 14610 ┆ 14610    ┆ 14610     ┆ 14610 ┆ 14610 ┆ 14610 │
└───────┴──────────┴───────────┴───────┴───────┴───────┘


In [24]:
climate.collect(streaming=True).write_csv(
    os.path.join(STAGING_DATA_FOLDER, "climate.csv")
)