In [31]:
import boto3
from os import path, makedirs, remove
from logging import Logger
from google.cloud import bigquery
from google.oauth2 import service_account
from typing import List
import logging
from shutil import rmtree
import pandas as pd
from datetime import datetime
import glob

# S3 bucket and paths
BUCKET_NAME = "oedi-data-lake"

PARENT_PREFIX = "pvdaq/parquet/"
SITE_PREFIX = PARENT_PREFIX + "site/"
MOUNT_PREFIX =  PARENT_PREFIX + "mount/"

METRICS_PREFIX = PARENT_PREFIX + "metrics/metrics__system_{ss_id}"
PV_PREFIX = PARENT_PREFIX + "pvdata/system_id={ss_id}/year={year}/month={month}/day={day}/"


class BaseExtractor:
    def __init__(
            self,
            aws_access_key_id: str,
            aws_secret_access_key: str,
            region_name: str,
            staging_area: str,
            logger: Logger
            ):
        """
            Initializes the Extract step of the data pipeline
        """
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key
        self.region_name = region_name

        self.s3 = boto3.client(
            "s3",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=region_name
        )

        self.staging_area = staging_area
        self.logger = logger


    def s3_download(self, key: str, filename: str):
        """
            Given an AWS S3 file key, downloads it.
            File is named filename.
            Assumes filename has valid file path. (director already exists)
        """
        self.s3.download_file(BUCKET_NAME, key, filename)


class BaseLoader:
     
    def __init__(
            self,
            project_id: str,
            credentials_path: str,
            staging_area: str,
            logger: Logger,
            ):
        self.project_id = project_id
        self.credentials = service_account.Credentials.from_service_account_file(credentials_path)
        self.client = bigquery.Client(project=project_id, credentials=self.credentials)
        self.staging_area = staging_area
        self.logger = logger

    def load_to_bq(
            self,
            dataset_id: str,
            table_id: str,
            table_schema: List[bigquery.SchemaField],
            source_data_path: str,
            ):
        table_ref = self.client.dataset(dataset_id).table(table_id)
        table = bigquery.Table(table_ref, schema=table_schema)

        try:
            self.client.get_table(table)
        except Exception:
            self.logger.info(f"Table {table} is not found. Creating...")
            self.client.create_table(table)

        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.PARQUET
        job_config.write_disposition = "WRITE_APPEND"
        job_config.schema_update_options = ['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION']

        with open(source_data_path, "rb") as source_file:
            job = self.client.load_table_from_file(source_file, table_ref, job_config=job_config)
        job.result()

        return None


class PVExtract(BaseExtractor):

    def extract_metrics(self, ss_id: int) -> None:
        """
            Extracts Metrics given an ss_id
        """
        metrics_aws_path = METRICS_PREFIX.replace("{ss_id}", str(ss_id))
        metrics_object = self.s3.list_objects(Bucket=BUCKET_NAME, Prefix=metrics_aws_path, Delimiter="/")
        try:
            metrics_key = metrics_object["Contents"][0]["Key"]
            local_dir = path.join(self.staging_area, f"system{ss_id}")
            makedirs(local_dir, exist_ok=True)
            self.s3_download(metrics_key, path.join(local_dir, f"metrics_system{ss_id}.parquet"))
        except Exception as error:
            self.logger.error(f"Error while extracting metrics for Site {ss_id}: \n{error}")


    def extract_pv_data(self, ss_id: int, date: datetime) -> None:
        """
            Extracts PV data given an ss_id and date (single day)
        """
        pv_aws_path = PV_PREFIX.replace("{ss_id}", str(ss_id)).replace("{year}", str(date.year)).replace("{month}", str(date.month)).replace("{day}", str(date.day))
        pv_object = self.s3.list_objects(Bucket=BUCKET_NAME, Prefix=pv_aws_path, Delimiter="/")
        try:
            pv_data_key = pv_object["Contents"][0]["Key"]
            local_dir = path.join(self.staging_area, f"system{ss_id}", "pv_data")
            makedirs(local_dir, exist_ok=True)
            self.s3_download(pv_data_key, path.join(local_dir, f"pv_data_system{ss_id}_{date.strftime('%Y-%m-%d')}.parquet"))
        except Exception as error:
            self.logger.error(f"Error while extracting PV data for Site {ss_id} on {date}: /n{error}")


    def extract(self, ss_id: int, start_date: datetime, end_date: datetime) -> None:
        """
            Extracts PV data and metrics for a given ss_id and date
        """
        makedirs(self.staging_area, exist_ok=True)

        self.logger.info(f"Extracting Metrics for System {ss_id}...")
        self.extract_metrics(ss_id)
        
        self.logger.info(f"Extracting PV data for System {ss_id} for dates: {start_date} to {end_date}")
        for date in pd.date_range(start=start_date, end=end_date):
            self.extract_pv_data(ss_id, date)



class PVTransform:

    def __init__(self, staging_area: str, logger: Logger):
        self.staging_area = staging_area
        self.logger = logger

    def transform(self, ss_id: int):
        """
            Transforms all the data present in the staging area for ss_id
        """
        try:
            self.logger.info(f"Transforming PV data for System {ss_id}...")
            
            metrics_cols = ["system_id", "metric_id", "sensor_name", "raw_units"]
            metrics = pd.read_parquet(path.join(self.staging_area, f"system{ss_id}", f"metrics_system{ss_id}.parquet"), columns=metrics_cols)
            metrics = metrics[metrics["sensor_name"].isin(["dc_power", "ac_power", "poa_irradiance"])]

            pv_cols = ["measured_on", "metric_id", "value"]
            pv_data = pd.read_parquet(path.join(self.staging_area, f"system{ss_id}", "pv_data", f"pv_data_system{ss_id}_2010-03-09.parquet"), columns=pv_cols)

            renames = {
                    "system_id":"ss_id",
                    "measured_on":"timestamp",
                    "sensor_name":"sensor",
                    "raw_units":"units"
                }
            merged = pd.merge(pv_data, metrics, "inner", "metric_id")
            merged = merged.rename(renames, axis=1)
            merged = merged[["ss_id", "units", "timestamp", "value", "sensor"]]
            merged = merged.replace("", pd.NA)

            for col in merged.columns:
                if col not in ["sensor", "units", "timestamp"]:
                    merged[col] = pd.to_numeric(merged[col], "coerce")

            merged.to_parquet(path.join(self.staging_area, "system10", "pv_data_merged.parquet"))
            rmtree(path.join(self.staging_area, f"system{ss_id}", "pv_data/"))
            remove(path.join(self.staging_area, f"system{ss_id}", f"metrics_system{ss_id}.parquet"))

        except Exception as error:
            self.logger.error(f"Error while transforming metadata: /n{error}")

        return None
    

class PVLoad(BaseLoader):

    def load(self, ss_id: int):
        table_schema = [
            bigquery.SchemaField("ss_id", "INTEGER", "NULLABLE"),
            bigquery.SchemaField("timetsamp", "DATETIME"),
            bigquery.SchemaField("sensor", "STRING"),
            bigquery.SchemaField("units", "STRING"),
            bigquery.SchemaField("value", "FLOAT"),
            ]
        parquet_files = glob.glob(path.join(self.staging_area, f"system{ss_id}", "pv_data_merged.parquet", "*.parquet"))
        self.logger.info(f"Uploading PV data for system {ss_id}")
        for file in parquet_files:
            try:
                self.load_to_bq(
                    dataset_id="pv_oedi",
                    table_id="pv_data",
                    table_schema=table_schema,
                    source_data_path=file
                )
            except Exception as error:
                self.logger.error(f"Error while loading PV data from {file} into BigQuery: \n{error}")
                raise error

        return None


In [32]:
loader = PVLoad(
    project_id="cohere-pv-pipeline",
    credentials_path="./bq_service_account_key.json",
    staging_area="staging_area",
    logger=logging.getLogger(__name__)
    )

In [33]:
loader.load(10)

In [22]:
pd.read_parquet("./staging_area/system10/pv_data_merged.parquet/part-00002-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet").info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4320 entries, 0 to 4319
Data columns (total 5 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   ss_id      4320 non-null   int32         
 1   timestamp  4320 non-null   datetime64[ns]
 2   sensor     4320 non-null   object        
 3   units      4320 non-null   object        
 4   value      4320 non-null   float64       
dtypes: datetime64[ns](1), float64(1), int32(1), object(2)
memory usage: 152.0+ KB


In [8]:
extractor = PVExtract(
    staging_area="staging_area",
    aws_access_key_id="AKIA4MTWG33OOIEEML5D",
    aws_secret_access_key="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",
    region_name="us-west-2",
    logger=logging.getLogger(__name__)
)

ss_id = 10
start_date = datetime.strptime("2010/03/01", "%Y/%m/%d")
end_date = datetime.strptime("2010/03/10", "%Y/%m/%d")

extractor.extract(
    ss_id=ss_id,
    start_date=start_date,
    end_date=end_date
)

In [7]:
transformer = PVTransform(
    staging_area="./staging_area/",
    logger=logging.getLogger(__name__)
)
transformer.transform(ss_id)

In [12]:
import glob

In [13]:
glob.glob("./staging_area/system10/pv_data_merged.parquet/*.parquet")

['./staging_area/system10/pv_data_merged.parquet\\part-00000-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged.parquet\\part-00001-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged.parquet\\part-00002-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged.parquet\\part-00003-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged.parquet\\part-00004-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged.parquet\\part-00005-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged.parquet\\part-00006-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged.parquet\\part-00007-d7237e7c-789a-4dc5-a879-cf6686bee8f7-c000.snappy.parquet',
 './staging_area/system10/pv_data_merged