# Data Preprocessing

## Install important libraries

In [None]:
!pip3 install -U sagemaker
!pip3 install polars

## Import libraries

In [1]:
import os
import io
import re
import boto3
import sagemaker
import polars as pl
import pandas as pd

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
role = sagemaker.get_execution_role()
region = sagemaker.Session().boto_region_name
staging_bucket = 'db-project-staging-area'

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
s3resource = boto3.resource("s3")
s3client = boto3.client("s3")

In [4]:
def get_s3_buffer(bucket, key):
    buffer = io.BytesIO()
    bucket.download_fileobj(key, buffer)
    return buffer

In [5]:
locationcategory_key = [x['Key'] for x in s3client.list_objects_v2(Bucket=staging_bucket, Prefix='locationcategory/')["Contents"]][0]
stationrelation_key = [x['Key'] for x in s3client.list_objects_v2(Bucket=staging_bucket, Prefix='stationrelation/')["Contents"]][0]
location_key = [x['Key'] for x in s3client.list_objects_v2(Bucket=staging_bucket, Prefix='location/')["Contents"]][0]
datatype_key = [x['Key'] for x in s3client.list_objects_v2(Bucket=staging_bucket, Prefix='datatype/')["Contents"]][0]
station_key = [x['Key'] for x in s3client.list_objects_v2(Bucket=staging_bucket, Prefix='station/')["Contents"]][0]
date_key = [x['Key'] for x in s3client.list_objects_v2(Bucket=staging_bucket, Prefix='date/')["Contents"]][0]

In [6]:
pd.read_parquet(get_s3_buffer(s3resource.Bucket(staging_bucket), locationcategory_key)).to_csv(os.path.join("staging_data", "locationcategory.csv"), index=False)
pd.read_parquet(get_s3_buffer(s3resource.Bucket(staging_bucket), stationrelation_key)).to_csv(os.path.join("staging_data", "stationrelation.csv"), index=False)
pd.read_parquet(get_s3_buffer(s3resource.Bucket(staging_bucket), location_key)).to_csv(os.path.join("staging_data", "location.csv"), index=False)
pd.read_parquet(get_s3_buffer(s3resource.Bucket(staging_bucket), datatype_key)).to_csv(os.path.join("staging_data", "datatype.csv"), index=False)
pd.read_parquet(get_s3_buffer(s3resource.Bucket(staging_bucket), station_key)).to_csv(os.path.join("staging_data", "station.csv"), index=False)
pd.read_parquet(get_s3_buffer(s3resource.Bucket(staging_bucket), date_key)).to_csv(os.path.join("staging_data", "date.csv"), index=False)

In [8]:
station = pd.read_csv(os.path.join("staging_data", "station.csv")).drop(columns="name")
date = pd.read_csv(os.path.join("staging_data", "date.csv")).drop(columns=["month_name", "day_name", "date", "weekday", "day"]).astype({"is_leap_year": "int"})
date["year"] = date["year"] - 2010
data = None
for i in range(2010, 2023):
    data_keys = [x['Key'] for x in s3client.list_objects_v2(Bucket=staging_bucket, Prefix=f'data/year={i}/')["Contents"]]
    df = None
    for key in data_keys:
        tdf = pd.read_parquet(get_s3_buffer(s3resource.Bucket(staging_bucket), key))
        if df is None:
            df = tdf
        else:
            df = pd.concat([df, tdf])
    df = df.pivot(index=["stationid", "dateid"], columns="datatypeid", values="value").reset_index().fillna(0).astype({"TMAX": "int", "TMIN": "int", "PRCP": "int", "SNOW": "int", "SNWD": "int"})
    df = df.merge(date, how="left", on="dateid").drop(columns=["dateid"])
    df = df.merge(station, how="left", left_on="stationid", right_on="id").drop(columns=["stationid", "id"])
    df.to_csv(os.path.join("staging_data", f"clean{i}.csv"), index=False)
    print(f"year={i} done")

year=2010 done
year=2011 done
year=2012 done
year=2013 done
year=2014 done
year=2015 done
year=2016 done
year=2017 done
year=2018 done
year=2019 done
year=2020 done
year=2021 done
year=2022 done


In [10]:
import csv

files = [os.path.join("staging_data", f"clean{i}.csv") for i in range(2010, 2023)]

with open(os.path.join("staging_data", f"training_data.csv"), "w") as datafile:
    writer = csv.writer(datafile)
    for i, filepath in enumerate(files):
        with open(filepath, "r") as csvfile:
            reader = csv.reader(csvfile)
            if i != 0:
                _ = next(reader, None)
            else:
                writer.writerow(next(reader, None))
            for line in reader:
                writer.writerow(line)