## 1. initialize

In [1]:
from pathlib import Path
import os
import getpass
import shutil

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

In [2]:
os.environ

environ{'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin',
        'HOME': '/home/hadoop',
        'LOGNAME': 'hadoop',
        'USER': 'hadoop',
        'SHELL': '/bin/bash',
        'JAVA_HOME': '/etc/alternatives/jre',
        'SPARK_HOME': '/usr/lib/spark',
        'LC_CTYPE': 'C.UTF-8',
        'PYDEVD_USE_FRAME_EVAL': 'NO',
        'JPY_PARENT_PID': '16982',
        'TERM': 'xterm-color',
        'CLICOLOR': '1',
        'PAGER': 'cat',
        'GIT_PAGER': 'cat',
        'MPLBACKEND': 'module://matplotlib_inline.backend_inline'}

## 2. start spark cluster

In [3]:
# tweak setting here:
def init_spark(cluster=None, name="dsgrid", tz="UTC"):
    """Initialize a SparkSession."""
    conf = SparkConf().setAppName(name)
    
    if cluster is None:
        spark = SparkSession.builder.master("local").appName(name).getOrCreate()
    elif cluster == "AWS":
        pass
        # does not need to setMaster for AWS cluster
    else:
        conf = conf.setMaster(cluster)
    conf = conf.setAll([
#             ("spark.sql.shuffle.partitions", "200"),
#             ("spark.executor.instances", "7"),
#             ("spark.executor.cores", "5"),
#             ("spark.executor.memory", "10g"),
#             ("spark.driver.memory", "10g"),
#             ("spark.dynamicAllocation.enabled", True),
#             ("spark.shuffle.service.enabled", True),
        ("spark.sql.session.timeZone", tz),
    ])
    spark = (
            SparkSession.builder.config(conf=conf)
            .getOrCreate()
        )
    return spark

To launch a standalone cluster or a cluster on Eagle, follow **instructions** here: \
https://github.com/dsgrid/dsgrid/tree/main/dev#spark-standalone-cluster

accordingly, uncomment and update the cluster name below:

In [4]:
main_tz = "EST" # <--- UTC, EST

### STAND-ALONE CLUSTER
# cluster = "spark://lliu2-34727s:7077"
# name = "stand-alone"

### CLUSTER ON HPC - Type in nodename
# NODENAME = "r103u23" # <--- change after deploying cluster
# cluster = f"spark://{NODENAME}.ib0.cm.hpc.nrel.gov:7077" 
# name = "HPC"

### CLUSTER ON HPC - Get cluster from file dropped by prep_spark_cluster_notebook.py
# import toml
# config = toml.load("cluster.toml")
# cluster = config["cluster"]
# name = "HPC"

### LOCAL MODE
# cluster = None 
# name = "local"

### AWS MODE
cluster = "AWS"
name = "AWS"

# Initialize
spark = init_spark(cluster, 'dsgrid-load', tz=main_tz)

# get Spark Context UI
sc = spark.sparkContext
sc

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/18 01:50:11 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


#### The *Spark UI* above works only for local mode. For HPC cluster Spark UI, use:
http://localhost:8080

In [5]:
for x in sorted(sc.getConf().getAll()):
    print(x)

('spark.app.id', 'application_1655516815883_0001')
('spark.app.name', 'dsgrid-load')
('spark.app.startTime', '1655517008666')
('spark.blacklist.decommissioning.enabled', 'true')
('spark.blacklist.decommissioning.timeout', '1h')
('spark.decommissioning.timeout.threshold', '20')
('spark.driver.appUIAddress', 'http://ip-172-18-27-10.us-west-2.compute.internal:4040')
('spark.driver.defaultJavaOptions', "-XX:OnOutOfMemoryError='kill -9 %p'")
('spark.driver.extraClassPath', '/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:

## 3. dsgrid

In [6]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
import pandas as pd
pd.set_option('display.max_rows', 20)
# import plotly
# pd.options.plotting.backend = "plotly"
import numpy as np
import itertools
import pytz
from datetime import datetime, timedelta

from semver import VersionInfo
from pydantic import ValidationError
import pyspark.sql.functions as F
import pyspark.sql.types as sparktypes

In [7]:
from dsgrid.common import LOCAL_REGISTRY
from dsgrid.registry.registry_manager import RegistryManager
from dsgrid.utils.files import load_data
from dsgrid.utils.spark import create_dataframe, read_dataframe, get_unique_values
from dsgrid.dimension.base_models import DimensionType
from dsgrid.dataset.dataset import Dataset
from dsgrid.project import Project
from dsgrid.dimension.time import TimeZone

## 3.1. Check dsgrid registry

In [8]:
## sync registry and then load offline
#LOCAL_REGISTRY = "s3://nrel-dsgrid-registry-archive"
registry_path = os.getenv("DSGRID_REGISTRY_PATH", default=LOCAL_REGISTRY)
registry_path

PosixPath('/home/hadoop/.dsgrid-registry')

In [9]:
sync_and_pull = True # <--- registry config only
if sync_and_pull:
    print(f"syncing registry: {registry_path}")
    RegistryManager.load(registry_path, offline_mode=False)

syncing registry: /home/hadoop/.dsgrid-registry
download: s3://nrel-dsgrid-registry/configs/datasets/conus_2022_reference_resstock/1.0.0/dataset.toml to ../.dsgrid-registry/configs/datasets/conus_2022_reference_resstock/1.0.0/dataset.toml
download: s3://nrel-dsgrid-registry/configs/datasets/conus_2022_reference_resstock/registry.toml to ../.dsgrid-registry/configs/datasets/conus_2022_reference_resstock/registry.toml
download: s3://nrel-dsgrid-registry/configs/datasets/tempo_conus_2022/1.0.0/dataset.toml to ../.dsgrid-registry/configs/datasets/tempo_conus_2022/1.0.0/dataset.toml
download: s3://nrel-dsgrid-registry/configs/dimension_mappings/acs_county_2018__us_counties_2020_l48__d3845570-48dc-4b7e-b4de-465272a0007c/registry.toml to ../.dsgrid-registry/configs/dimension_mappings/acs_county_2018__us_counties_2020_l48__d3845570-48dc-4b7e-b4de-465272a0007c/registry.toml
download: s3://nrel-dsgrid-registry/configs/datasets/conus_2022_reference_comstock/registry.toml to ../.dsgrid-registry/co

Completed 257.6 KiB/~1022.4 KiB (503.8 KiB/s) with ~69 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimension_mappings/tempo_ldv_pev_usage__conus-2022-detailed-end-uses-kwh__370a689f-d82d-4b17-9c89-3b70050a0413/1.0.0/metric_to_metric.csv to ../.dsgrid-registry/configs/dimension_mappings/tempo_ldv_pev_usage__conus-2022-detailed-end-uses-kwh__370a689f-d82d-4b17-9c89-3b70050a0413/1.0.0/metric_to_metric.csv
Completed 257.6 KiB/~1022.6 KiB (503.8 KiB/s) with ~69 file(s) remaining (calculating...)Completed 303.6 KiB/~1022.6 KiB (592.6 KiB/s) with ~69 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimension_mappings/tempo_bin__conus-2022-detailed-subsectors__006c7da2-cf2b-4863-b802-0d0f8cca1306/1.0.0/dimension_mapping.toml to ../.dsgrid-registry/configs/dimension_mappings/tempo_bin__conus-2022-detailed-subsectors__006c7da2-cf2b-4863-b802-0d0f8cca1306/1.0.0/dimension_mapping.toml
Completed 303.6 KiB/~1022.6 KiB (592.6 KiB/s) wi

Completed 591.1 KiB/~1.2 MiB (856.3 KiB/s) with ~84 file(s) remaining (calculating...)Completed 591.2 KiB/~1.2 MiB (847.7 KiB/s) with ~84 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/data_source/comstock-data-source__8fbe464f-ad5d-460b-bd2d-ee25625a8a27/1.0.0/data_source_comstock.csv to ../.dsgrid-registry/configs/dimensions/data_source/comstock-data-source__8fbe464f-ad5d-460b-bd2d-ee25625a8a27/1.0.0/data_source_comstock.csv
Completed 591.2 KiB/~1.2 MiB (847.7 KiB/s) with ~84 file(s) remaining (calculating...)Completed 591.5 KiB/~1.2 MiB (831.4 KiB/s) with ~84 file(s) remaining (calculating...)Completed 591.8 KiB/~1.2 MiB (830.8 KiB/s) with ~84 file(s) remaining (calculating...)Completed 592.1 KiB/~1.2 MiB (830.8 KiB/s) with ~85 file(s) remaining (calculating...)Completed 592.4 KiB/~1.2 MiB (830.5 KiB/s) with ~85 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimension_mappings/us_counties_2020_l48__us_st

Completed 763.9 KiB/~1.2 MiB (852.6 KiB/s) with ~96 file(s) remaining (calculating...)Completed 764.4 KiB/~1.2 MiB (844.7 KiB/s) with ~96 file(s) remaining (calculating...)Completed 764.6 KiB/~1.2 MiB (838.1 KiB/s) with ~98 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/geography/reeds_pca__f002c1f9-9e4c-4c77-8c50-71f9f46ece3e/registry.toml to ../.dsgrid-registry/configs/dimensions/geography/reeds_pca__f002c1f9-9e4c-4c77-8c50-71f9f46ece3e/registry.toml
Completed 764.6 KiB/~1.2 MiB (838.1 KiB/s) with ~97 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/geography/conus_2022-comstock_us_county_fip__2bc5b3dc-6ec0-4e3c-8d7f-5af5dba0b16f/registry.toml to ../.dsgrid-registry/configs/dimensions/geography/conus_2022-comstock_us_county_fip__2bc5b3dc-6ec0-4e3c-8d7f-5af5dba0b16f/registry.toml
Completed 764.6 KiB/~1.2 MiB (838.1 KiB/s) with ~96 file(s) remaining (calculating...)Completed 850.3 KiB/~1.2 MiB (919.2

Completed 1.1 MiB/~1.2 MiB (1.1 MiB/s) with ~104 file(s) remaining (calculating...)Completed 1.1 MiB/~1.2 MiB (1.0 MiB/s) with ~105 file(s) remaining (calculating...)Completed 1.1 MiB/~1.3 MiB (1.0 MiB/s) with ~106 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/geography/us_counties_2010_-_comstock_only__65265034-45f2-439f-ab3c-cece7b20ecab/1.0.0/comstock_counties.csv to ../.dsgrid-registry/configs/dimensions/geography/us_counties_2010_-_comstock_only__65265034-45f2-439f-ab3c-cece7b20ecab/1.0.0/comstock_counties.csv
Completed 1.1 MiB/~1.3 MiB (1.0 MiB/s) with ~105 file(s) remaining (calculating...)Completed 1.2 MiB/~1.3 MiB (1.1 MiB/s) with ~105 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/geography/us_states__4e348f5d-4976-4953-9dde-3afa30af303e/registry.toml to ../.dsgrid-registry/configs/dimensions/geography/us_states__4e348f5d-4976-4953-9dde-3afa30af303e/registry.toml
Completed 1.2 MiB/~1.3 M

Completed 1.2 MiB/~1.4 MiB (1005.4 KiB/s) with ~108 file(s) remaining (calculating...)Completed 1.2 MiB/~1.4 MiB (1005.1 KiB/s) with ~109 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/metric/conus_2022-resstock_energy_enduses_kwh__1d476bea-ce82-4e88-9ed4-d09c39824876/registry.toml to ../.dsgrid-registry/configs/dimensions/metric/conus_2022-resstock_energy_enduses_kwh__1d476bea-ce82-4e88-9ed4-d09c39824876/registry.toml
Completed 1.2 MiB/~1.4 MiB (1005.1 KiB/s) with ~108 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/metric/tempo_ldv_pev_usage__13436861-0cf0-4df1-80bd-895311bce819/1.0.0/end_use.csv to ../.dsgrid-registry/configs/dimensions/metric/tempo_ldv_pev_usage__13436861-0cf0-4df1-80bd-895311bce819/1.0.0/end_use.csv
Completed 1.2 MiB/~1.4 MiB (1005.1 KiB/s) with ~108 file(s) remaining (calculating...)download: s3://nrel-dsgrid-registry/configs/dimensions/model_year/model-year-2018__6f4a5fde-b7a6

Completed 1.2 MiB/1.6 MiB (782.2 KiB/s) with 77 file(s) remainingCompleted 1.2 MiB/1.6 MiB (774.2 KiB/s) with 77 file(s) remainingdownload: s3://nrel-dsgrid-registry/configs/dimensions/subsector/conus_2022-resstock_building_types__2dadfec2-3fa1-46bd-af7d-0ec3afab9a9e/registry.toml to ../.dsgrid-registry/configs/dimensions/subsector/conus_2022-resstock_building_types__2dadfec2-3fa1-46bd-af7d-0ec3afab9a9e/registry.toml
Completed 1.2 MiB/1.6 MiB (774.2 KiB/s) with 76 file(s) remainingCompleted 1.2 MiB/1.6 MiB (772.0 KiB/s) with 76 file(s) remainingdownload: s3://nrel-dsgrid-registry/configs/dimensions/subsector/conus_2022-comstock_building_types__1c099f75-75a9-43d2-8469-1e88384f2889/1.0.0/conus_2022-comstock_subsectors.csv to ../.dsgrid-registry/configs/dimensions/subsector/conus_2022-comstock_building_types__1c099f75-75a9-43d2-8469-1e88384f2889/1.0.0/conus_2022-comstock_subsectors.csv
Completed 1.2 MiB/1.6 MiB (772.0 KiB/s) with 75 file(s) remainingCompleted 1.2 MiB/1.6 MiB (770.3 K

Completed 1.4 MiB/1.6 MiB (792.9 KiB/s) with 40 file(s) remainingCompleted 1.4 MiB/1.6 MiB (802.2 KiB/s) with 40 file(s) remainingCompleted 1.4 MiB/1.6 MiB (802.4 KiB/s) with 40 file(s) remainingCompleted 1.4 MiB/1.6 MiB (802.2 KiB/s) with 40 file(s) remainingdownload: s3://nrel-dsgrid-registry/configs/projects/dsgrid_conus_2022/1.1.0/data_source__sector.csv to ../.dsgrid-registry/configs/projects/dsgrid_conus_2022/1.1.0/data_source__sector.csv
Completed 1.4 MiB/1.6 MiB (802.2 KiB/s) with 39 file(s) remainingCompleted 1.4 MiB/1.6 MiB (800.5 KiB/s) with 39 file(s) remainingCompleted 1.4 MiB/1.6 MiB (801.2 KiB/s) with 39 file(s) remainingCompleted 1.4 MiB/1.6 MiB (801.8 KiB/s) with 39 file(s) remainingdownload: s3://nrel-dsgrid-registry/configs/projects/dsgrid_conus_2022/1.0.0/subsector__metric.csv to ../.dsgrid-registry/configs/projects/dsgrid_conus_2022/1.0.0/subsector__metric.csv
Completed 1.4 MiB/1.6 MiB (801.8 KiB/s) with 38 file(s) remainingdownload: s3://nrel-dsgrid-regis

Completed 1.6 MiB/1.6 MiB (821.1 KiB/s) with 2 file(s) remainingCompleted 1.6 MiB/1.6 MiB (824.2 KiB/s) with 2 file(s) remainingdownload: s3://nrel-dsgrid-registry/configs/projects/dsgrid_conus_2022/1.3.0/project.toml to ../.dsgrid-registry/configs/projects/dsgrid_conus_2022/1.3.0/project.toml
Completed 1.6 MiB/1.6 MiB (824.2 KiB/s) with 1 file(s) remainingCompleted 1.6 MiB/1.6 MiB (819.4 KiB/s) with 1 file(s) remainingdownload: s3://nrel-dsgrid-registry/configs/projects/dsgrid_conus_2022/1.2.0/scenario__data_source.csv to ../.dsgrid-registry/configs/projects/dsgrid_conus_2022/1.2.0/scenario__data_source.csv


In [10]:
# ETH@Review: Were you intending to write something to the right of the arrow?
offline_mode = True # <---

registry_mgr = RegistryManager.load(registry_path, offline_mode=offline_mode)
project_mgr = registry_mgr.project_manager
dataset_mgr = registry_mgr.dataset_manager
dim_map_mgr = registry_mgr.dimension_mapping_manager
dim_mgr = registry_mgr.dimension_manager
# ETH@Review: This line seems out of place. Or change "Loading" to "Loaded"?
print(f"Loaded dsgrid registry at: {registry_path}")

Loaded dsgrid registry at: /home/hadoop/.dsgrid-registry


In [11]:
project_mgr.show(max_width=30, drop_fields=["Date", "Submitter"])

                                                                                

ID,Version,Status,Datasets,Description
dsgrid_conus_2022,1.3.0,Complete,"tempo_conus_2022: Registered, conus_2022_reference_resstock: Registered, conus_2022_reference_comstock: Registered",Dataset created for the FY21 dsgrid Load Profile Tool for Grid Modeling project


In [12]:
# %%timeit
# ## Dan's test
# from dsgrid.config.time_dimension_base_config import TimeDimensionBaseConfig

# i = 0
# for d_id in registry_mgr.dimension_manager._id_to_type:
#     config = registry_mgr.dimension_manager.get_by_id(d_id)
#     if not isinstance(config, TimeDimensionBaseConfig):
#         config.get_records_dataframe().count()
#         i += 1
        
# print(i)

## 3.2. Load Project
This section is mostly exploratory (For *Section 4. Queries*, only need to load project) 

####  Some user criteria:
At the projects, I want to be able to:
- Examine what's available in the project:
    * Show project dimensions by type, show resolution by type - I don't care: base/supplemental, mappings, id
    * Get unique records by dimension/resolution
    * Get unique records by selected dimension sets
    * Show mapped dataset
    * Show unit (or select a unit of analysis) and fuel types
- Make queries using:
    * Project dimensions + fuel types + time resolutions
    * Get all types of statistics (max, mean, min, percentiles, count, sum)
    
- dataset level: never mapped, think TEMPO,
- interface to allow for query optimization
    
#### Notes:
 * Project_manager has access to all other managers.
 * Each manager has the responsiblity to retrieve configs
 * Access ConfigModel from configs

In [13]:
# load projct
project_id = "dsgrid_conus_2022" # <---
project = Project.load(project_id, offline_mode=offline_mode)

print("project loaded")

project loaded


## 3.3. Load Project Datasets

### 3.3.3. TEMPO

load and check tempo dataset here

In [14]:
dataset_id = "tempo_conus_2022" # <----
project.load_dataset(dataset_id)
tempo = project.get_dataset(dataset_id)
print("tempo dataset loaded")

22/06/18 01:51:09 WARN BasicProfileConfigFileLoader: Unable to load config file /home/hadoop/.aws/config
java.lang.IllegalArgumentException: Invalid property format: no '=' character is found on line 1
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.auth.profile.internal.AbstractProfilesConfigFileScanner.parsePropertyLine(AbstractProfilesConfigFileScanner.java:162)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.auth.profile.internal.AbstractProfilesConfigFileScanner.run(AbstractProfilesConfigFileScanner.java:119)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader$ProfilesConfigFileLoaderHelper.parseProfileProperties(BasicProfileConfigLoader.java:130)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader.loadProfiles(BasicProfileConfigLoader.java:83)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader.loadProfiles(BasicProfileConfigLoa

[Stage 23:>                                                         (0 + 1) / 1]

tempo dataset loaded


                                                                                

In [15]:
### TO BE DELETED ###
tempo_load_data_lookup = tempo.load_data_lookup
tempo_load_data = tempo.load_data

# file = "/scratch/dthom/tempo_load_data3.parquet" # <---
# tempo_load_data = spark.read.parquet(file)

In [16]:
tempo_mapped_load_data_lookup = tempo._handler._remap_dimension_columns(tempo_load_data_lookup)
tempo_mapped_load_data = tempo._handler._remap_dimension_columns(tempo_load_data)

In [17]:
del tempo_load_data_lookup
del tempo_load_data

## 4. Queries
### Query util functions

### 4.1. Hourly electricity consumption by *scenario, model_year, and ReEDS PCA*

In [18]:
### all_enduses-totelectric_enduses map

dim_map_id = "conus-2022-detailed-end-uses-kwh__all-electric-end-uses__c4149547-1209-4ce3-bb4c-3ab292067e8a" # <---
electric_enduses_map = dim_map_mgr.get_by_id(dim_map_id).get_records_dataframe()

### get all project electric end uses
electric_enduses = electric_enduses_map.filter("to_id is not NULL").select("from_id").toPandas()["from_id"].to_list()
electric_enduses

['district_cooling_cooling',
 'electricity_cooling',
 'electricity_exterior_lighting',
 'electricity_fans',
 'electricity_heat_recovery',
 'electricity_heat_rejection',
 'electricity_heating',
 'electricity_interior_equipment',
 'electricity_interior_lighting',
 'electricity_pumps',
 'electricity_refrigeration',
 'electricity_water_systems',
 'electricity_bath_fan',
 'electricity_ceiling_fan',
 'electricity_clothes_dryer',
 'electricity_clothes_washer',
 'electricity_cooking_range',
 'electricity_dishwasher',
 'electricity_exterior_holiday_lighting',
 'electricity_extra_refrigerator',
 'electricity_fans_cooling',
 'electricity_fans_heating',
 'electricity_freezer',
 'electricity_garage_lighting',
 'electricity_heating_supplemental',
 'electricity_hot_tub_heater',
 'electricity_hot_tub_pump',
 'electricity_house_fan',
 'electricity_plug_loads',
 'electricity_pool_heater',
 'electricity_pool_pump',
 'electricity_pumps_cooling',
 'electricity_pumps_heating',
 'electricity_range_fan',
 'el

In [19]:
### county-to-PCA map
dim_map_id = "us_counties_2020_l48__reeds_pca__fcc554e1-87c9-483f-89e3-a0df9563cf89" # <---
county_to_pca_map = dim_map_mgr.get_by_id(dim_map_id).get_records_dataframe()
county_to_pca_map.show()


+-------+-----+-------------+
|from_id|to_id|from_fraction|
+-------+-----+-------------+
|  01001|  p90|          1.0|
|  01003|  p90|          1.0|
|  01005|  p90|          1.0|
|  01007|  p89|          1.0|
|  01009|  p90|          1.0|
|  01011|  p90|          1.0|
|  01013|  p90|          1.0|
|  01015|  p90|          1.0|
|  01017|  p90|          1.0|
|  01019|  p90|          1.0|
|  01021|  p90|          1.0|
|  01023|  p89|          1.0|
|  01025|  p90|          1.0|
|  01027|  p90|          1.0|
|  01029|  p90|          1.0|
|  01031|  p90|          1.0|
|  01033|  p89|          1.0|
|  01035|  p90|          1.0|
|  01037|  p90|          1.0|
|  01039|  p90|          1.0|
+-------+-----+-------------+
only showing top 20 rows



### 4.1.3. TEMPO
query TEMPO data here

In [20]:
## Load timezone map (not registered)
timezone_file = "s3://nrel-dsgrid-int-scratch/scratch-lliu2/county_fip_to_local_prevailing_time.csv" # "/scratch/lliu2/project_county_timezone/county_fip_to_local_prevailing_time.csv"
tz_map = spark.read.csv(timezone_file, header=True)
tz_map = tz_map.withColumn("from_fraction", F.lit(1))
tz_map.show()

+-------+-----------------+-------------+
|from_id|            to_id|from_fraction|
+-------+-----------------+-------------+
|  01001|CentralPrevailing|            1|
|  01003|CentralPrevailing|            1|
|  01005|CentralPrevailing|            1|
|  01007|CentralPrevailing|            1|
|  01009|CentralPrevailing|            1|
|  01011|CentralPrevailing|            1|
|  01013|CentralPrevailing|            1|
|  01015|CentralPrevailing|            1|
|  01017|CentralPrevailing|            1|
|  01019|CentralPrevailing|            1|
|  01021|CentralPrevailing|            1|
|  01023|CentralPrevailing|            1|
|  01025|CentralPrevailing|            1|
|  01027|CentralPrevailing|            1|
|  01029|CentralPrevailing|            1|
|  01031|CentralPrevailing|            1|
|  01033|CentralPrevailing|            1|
|  01035|CentralPrevailing|            1|
|  01037|CentralPrevailing|            1|
|  01039|CentralPrevailing|            1|
+-------+-----------------+-------

In [21]:
### get electric end uses for transportation
tra_elec_enduses = [col for col in tempo_mapped_load_data.columns if col in electric_enduses]
tra_elec_enduses

['electricity_ev_dcfc', 'electricity_ev_l1l2']

In [22]:
### TO BE DELETED
# tempo_mapped_load_data_lookup = tempo_mapped_load_data_lookup.filter("id in ('1621180393', '770011011', '1058530452')")
# tempo_mapped_load_data = tempo_mapped_load_data.filter("id in ('1621180393', '770011011', '1058530452')")

In [23]:
%%time
## 0. consolidate load_data: get total hourly electricity consumption by id
# make get_time_cols accessible at dataset level
tra_elec_kwh = tempo_mapped_load_data.select(
    "id",
    "day_of_week",
    "hour",
    "month",
    sum([F.col(col) for col in tra_elec_enduses]).alias("electricity")
)
# tra_elec_kwh.show()

CPU times: user 3.15 ms, sys: 401 µs, total: 3.55 ms
Wall time: 19.9 ms


In [24]:
%%time
## 1. map load_data_lookup to timezone
load_data_lookup = tempo_mapped_load_data_lookup.filter("id is not NULL")\
.select("sector", "scenario", "model_year", "geography", "id", "fraction").join(
    tz_map,
    on = F.col("geography")==tz_map.from_id,
    how = "left",
).drop("from_id").withColumnRenamed("to_id", "timezone")

## combine fraction
nonfraction_cols = [x for x in load_data_lookup.columns if x not in {"fraction", "from_fraction"}]
load_data_lookup = load_data_lookup.fillna(1, subset=["from_fraction"]).selectExpr(
    *nonfraction_cols, "fraction*from_fraction AS fraction"
)
# load_data_lookup.show()

CPU times: user 0 ns, sys: 7.13 ms, total: 7.13 ms
Wall time: 60.9 ms


In [25]:
%%time
## 2. join load_data and lookup
tra_elec_kwh = load_data_lookup.join(
    tra_elec_kwh,
    on="id",
    how="left",
).drop("id")

tra_elec_kwh = tra_elec_kwh.groupBy(
    "sector",
    "scenario", 
    "geography",
    "model_year",
    "timezone",
    "day_of_week",
    "month",
    "hour",
).agg(F.sum(
    F.col("fraction")*F.col("electricity")
).alias("electricity")
    )

## cache df
# tra_elec_kwh = tra_elec_kwh.cache()
# tra_elec_kwh.show()

CPU times: user 6.1 ms, sys: 0 ns, total: 6.1 ms
Wall time: 58.9 ms


In [26]:
%%time
year = 2012 # <--- weather year
sys_tz = TimeZone.EST.tz
timezones_local = [TimeZone.EPT, TimeZone.CPT, TimeZone.MPT, TimeZone.PPT]

## 3. create range of model_year
model_time_pd = []
for tz in timezones_local:
    model_time_df = pd.DataFrame()
    # create time range in local time
    model_time_df["timestamp"] = pd.date_range(
        start=datetime(year=int(year), month=1, day=1, hour=0),
        end=datetime(year=int(year), month=12, day=31, hour=23),
        tz=tz.tz,
        freq="H")
    model_time_df["timezone"] = tz.value
    model_time_df["day_of_week"] = model_time_df["timestamp"].dt.day_of_week.astype(str)
    model_time_df["month"] = model_time_df["timestamp"].dt.month.astype(str)
    model_time_df["hour"] = model_time_df["timestamp"].dt.hour.astype(str)
    
    # convert to main timezone
    model_time_df["timestamp"] = model_time_df["timestamp"].dt.tz_convert(sys_tz)
    # wrap time to year
    model_time_df["timestamp"] = model_time_df["timestamp"].apply(lambda x: x.replace(year=year))
    
    model_time_pd.append(model_time_df)
    
model_time_pd = pd.concat(model_time_pd, axis=0, ignore_index=True)
model_time_pd["timestamp"] = model_time_pd["timestamp"].dt.tz_localize(None).astype(str) # conver timestamp to str, this is important!
print(model_time_pd)

# convert to spark df
schema = sparktypes.StructType([
    sparktypes.StructField("timestamp", sparktypes.StringType(), False), \
    sparktypes.StructField("timezone", sparktypes.StringType(), False), \
    sparktypes.StructField("day_of_week", sparktypes.StringType(), False), \
    sparktypes.StructField("month", sparktypes.StringType(), False), \
    sparktypes.StructField("hour", sparktypes.StringType(), False), \
])
model_time =spark.createDataFrame(model_time_pd, schema=schema)

## covert timestamp from str to timestamp
model_time = model_time.withColumn("timestamp", F.from_unixtime(
    F.unix_timestamp(
        F.col("timestamp"), "yyyy-MM-dd HH:mm:ss"
    ), "yyyy-MM-dd HH:mm:ss"
))
model_time = model_time.withColumn("timestamp", F.to_timestamp("timestamp"))
model_time = model_time.cache()

print(model_time.printSchema())
print(model_time.count())
model_time.show()

                 timestamp           timezone day_of_week month hour
0      2012-01-01 00:00:00  EasternPrevailing           6     1    0
1      2012-01-01 01:00:00  EasternPrevailing           6     1    1
2      2012-01-01 02:00:00  EasternPrevailing           6     1    2
3      2012-01-01 03:00:00  EasternPrevailing           6     1    3
4      2012-01-01 04:00:00  EasternPrevailing           6     1    4
...                    ...                ...         ...   ...  ...
35131  2012-12-31 22:00:00  PacificPrevailing           0    12   19
35132  2012-12-31 23:00:00  PacificPrevailing           0    12   20
35133  2012-01-01 00:00:00  PacificPrevailing           0    12   21
35134  2012-01-01 01:00:00  PacificPrevailing           0    12   22
35135  2012-01-01 02:00:00  PacificPrevailing           0    12   23

[35136 rows x 5 columns]
root
 |-- timestamp: timestamp (nullable = true)
 |-- timezone: string (nullable = false)
 |-- day_of_week: string (nullable = false)
 |-- month: 

In [27]:
%%time
## 4. expand to model_years
tra_elec_kwh = model_time.join(
    tra_elec_kwh,
    on=["timezone", "day_of_week", "month", "hour"], 
    how="right"
).drop("day_of_week", "month", "hour")

## cache df
# tra_elec_kwh = tra_elec_kwh.cache()
# tra_elec_kwh.show()

CPU times: user 25 µs, sys: 3.57 ms, total: 3.59 ms
Wall time: 20.6 ms


In [28]:
%%time
# 5. map load_data_lookup to PCA
tra_elec_kwh = tra_elec_kwh.join(
    county_to_pca_map,
    on = F.col("geography")==county_to_pca_map.from_id,
    how = "left").drop("from_id").drop("geography").withColumnRenamed("to_id", "geography").groupBy(
    "sector",
    "scenario", 
    "geography",
    "model_year",
    "timestamp"
).agg(F.sum("electricity").alias("electricity"))

# tra_elec_kwh.show()

CPU times: user 4.04 ms, sys: 329 µs, total: 4.37 ms
Wall time: 33.1 ms


In [29]:
%%time
### 6. save as partitions
tra_output_file = "s3://nrel-dsgrid-int-scratch/scratch-lliu2/tempo_projections.parquet" #Path(f"/scratch/{getpass.getuser()}/tempo_projections.parquet")

# # refresh file dir
if Path(tra_output_file).exists():
    shutil.rmtree(tra_output_file)

if Path(tra_output_file).exists():
    raise ValueError(f"file: {tra_output_file} already exist. `shutile.rmtree(tra_output_file)` to override.")

tra_elec_kwh.sort("scenario", "model_year", "geography", "timestamp")\
    .repartition("scenario", "model_year").write\
    .partitionBy("scenario", "model_year")\
    .option("path", tra_output_file)\
    .saveAsTable("tra_elec_kwh", format='parquet')

print("tra_elec_kwh saved")

                                                                                

tra_elec_kwh saved
CPU times: user 2.28 s, sys: 529 ms, total: 2.81 s
Wall time: 1h 36min 55s


In [30]:
# %%time
# ########## load transportation projection data ###########
# tra_output_file = "s3://nrel-dsgrid-int-scratch/scratch-lliu2/tempo_projections.parquet" #Path(f"/scratch/{getpass.getuser()}/tempo_projections.parquet")

# if Path(tra_output_file).exists():
#     tra_elec_kwh = read_dataframe(tra_output_file)
#     print("tra_elec_kwh loaded")
# else:
#     print(f"tra_output_file={tra_output_file} does not exist")

In [32]:
%%time
ts = tra_elec_kwh.groupBy("timestamp").count().orderBy("timestamp").toPandas()
ts

22/06/18 04:04:09 WARN TaskSetManager: Lost task 86.0 in stage 109.0 (TID 6830) (ip-172-18-27-14.us-west-2.compute.internal executor 1): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 32768 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:158)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:97)
	at org.apache.spark.shuffle.sort.ShuffleInMemorySorter.reset(ShuffleInMemorySorter.java:109)
	at org.apache.spark.shuffle.sort.ShuffleExternalSorter.spill(ShuffleExternalSorter.java:318)
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:213)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:297)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:95)
	at org.apache.spark.shuffle.sort.ShuffleExternalSorter.growPointerArrayIfNecessary(ShuffleExternalSorter.java:390)
	at org.apache.spark.shuffle.sort.ShuffleE

Unnamed: 0,timestamp,count
0,2012-01-01 00:00:00,16340
1,2012-01-01 01:00:00,16340
2,2012-01-01 02:00:00,16340
3,2012-01-01 03:00:00,16340
4,2012-01-01 04:00:00,16340
...,...,...
8779,2012-12-31 19:00:00,16340
8780,2012-12-31 20:00:00,16340
8781,2012-12-31 21:00:00,16340
8782,2012-12-31 22:00:00,16340
