# DS5460 Milestone 2 - EDA 

### Ingesting Files to PySpark

Author: Anne Tumlin

Date: 03/21/25

Now that we have taken the files from the original GCS bucket, extracted them, and put them in our local GCS bucket (see `docs/EXTRACTION_PROCESS` in GitHub for more details), we can begin to ingest our data into PySpark. 

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("app_name") \
    .getOrCreate()

Don't forget to edit the path with YOUR google storage bucket here. 

In [3]:
json_path = "gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/*.json"

### Testing on a Small Scale
Let's try this with only 100 files first. 

In [9]:
from google.cloud import storage
client = storage.Client()

Reminder: Change bucket_name 

In [10]:
bucket_name = "ds5460-tumlinam-fp-bucket"
bucket = client.get_bucket(bucket_name)

In [12]:
prefix = "gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/"

In [13]:
# List blobs (files) in the specified prefix and collect the first 100 file paths
blobs = bucket.list_blobs(prefix=prefix)
file_paths = []
for blob in blobs:
    file_paths.append(f"gs://{bucket_name}/{blob.name}")
    if len(file_paths) >= 100:
        break

In [14]:
print("Loading the following 100 JSON file paths:")
for path in file_paths:
    print(path)

Loading the following 100 JSON file paths:
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15000.json
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15001.json
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15002.json
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15003.json
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15004.json
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15005.json
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15006.json
gs://ds5460-tumlinam-fp-bucket/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15007.json
gs://ds5460-t

**IMPORTANT NOTE:** After testing and running into issues with the schema, I discovered that due to the way the JSON files are formatted we must utilize the multiline read option. Otherwise, our data will not be read in properly. Instead, it will lead to the error `|-- _corrupt_record: string (nullable = true)`. 

In [19]:
# Make sure to use multiline read option!
df_small = spark.read.option("multiline", "true").json(file_paths)

In [20]:
df_small.printSchema()

root
 |-- grid: struct (nullable = true)
 |    |-- context: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |-- edges: struct (nullable = true)
 |    |    |-- ac_line: struct (nullable = true)
 |    |    |    |-- features: array (nullable = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- receivers: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- senders: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |-- generator_link: struct (nullable = true)
 |    |    |    |-- receivers: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- senders: array (nullable = true)
 |    |    |  

In [21]:
df_small.show(2)

+--------------------+--------------------+--------------------+
|                grid|            metadata|            solution|
+--------------------+--------------------+--------------------+
|[[[[100.0]]], [[[...| [443934.8106702195]|[[[[[1.2271252469...|
|[[[[100.0]]], [[[...|[465533.45792886155]|[[[[[1.3700529900...|
+--------------------+--------------------+--------------------+
only showing top 2 rows



### Testing on the Full Dataset

In [None]:
df = spark.read.option("multiline", "true").json(json_path)

In [None]:
df.printSchema()

### Testing the Summarizing Relevant Datasets Code for the New Dataset
  
Author: Ewan Long
  
Date: 03/23/25
  
We are trying to test and revise the original code we have in the Milestone 1 as we changed our dataset (see `docs/EXTRACTION_PROCESS` in GitHub for reasons & more details), we have ingested our data into PySpark. 
  
The following code will try to aggregate data and join the key tables (see Milestone 1 Notebook and doc for reasons)

___

Firstly, initialize the SparkSession. We will process the first 100 JSON files here for faster processing and test verification.

In [1]:
from google.cloud import storage
client = storage.Client()

In [2]:
import os
import subprocess
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, explode, input_file_name, expr, sum as spark_sum, avg, count

In [3]:
spark = SparkSession.builder \
    .appName("app_name") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/24 04:05:44 INFO SparkEnv: Registering MapOutputTracker
25/03/24 04:05:44 INFO SparkEnv: Registering BlockManagerMaster
25/03/24 04:05:44 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/03/24 04:05:44 INFO SparkEnv: Registering OutputCommitCoordinator


In [4]:
json_path = "gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/*.json"


In [5]:
# Change the bucket to your own's
bucket_name = "dataproc-temp-us-central1-299400297029-n5lhuci2"
bucket = client.get_bucket(bucket_name)

In [6]:
prefix = "gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/"

In [7]:
# List blobs (files) in the specified prefix and collect the first 100 file paths
blobs = bucket.list_blobs(prefix=prefix)
file_paths = []
for blob in blobs:
    file_paths.append(f"gs://{bucket_name}/{blob.name}")
    if len(file_paths) >= 100:
        break

In [8]:
print("Loading the following 100 JSON file paths:")
for path in file_paths:
    print(path)

Loading the following 100 JSON file paths:
gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15000.json
gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15001.json
gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15002.json
gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15003.json
gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15004.json
gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/group_1/example_15005.json
gs://dataproc-temp-us-central1-299400297029-n5lhuci2/gridopt-dataset-tmp/dataset_release_1/pglib_opf_case500_goc/grou

Then, read all JSON files with the multiline option enabled so that nested JSON is parsed correctly. Adding the original **filename** column for identification and join purposes.

In [9]:

df = spark.read.option("multiline", "true").json(file_paths).withColumn("filename", input_file_name())

# Print the schema to verify that 'grid.nodes.generator' and others are parsed correctly.
df.printSchema()

                                                                                

root
 |-- grid: struct (nullable = true)
 |    |-- context: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |-- edges: struct (nullable = true)
 |    |    |-- ac_line: struct (nullable = true)
 |    |    |    |-- features: array (nullable = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- receivers: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- senders: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |-- generator_link: struct (nullable = true)
 |    |    |    |-- receivers: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- senders: array (nullable = true)
 |    |    |  

Then we can do the data processing. Let's process generator data first. 
  
I will explode nested JSON and extract generator attributes.

In [10]:
gen_df = df.select(
    "filename",
    explode("grid.nodes.generator").alias("generator_array")
).select(
    "filename",
    col("generator_array")[0].alias("mbase"),
    col("generator_array")[1].alias("pg"),
    col("generator_array")[2].alias("pmin"),
    col("generator_array")[3].alias("pmax"),
    col("generator_array")[4].alias("qg"),
    col("generator_array")[5].alias("qmin"),
    col("generator_array")[6].alias("qmax"),
    col("generator_array")[7].alias("vg"),
    col("generator_array")[8].alias("cost_squared"),
    col("generator_array")[9].alias("cost_linear"),
    col("generator_array")[10].alias("cost_offset")
)

gen_df.show(5) # verify the extracted attributes

                                                                                

+--------------------+-----+-------------------+--------------------+-----+--------+--------+-------+---+------------+-----------+-----------+
|            filename|mbase|                 pg|                pmin| pmax|      qg|    qmin|   qmax| vg|cost_squared|cost_linear|cost_offset|
+--------------------+-----+-------------------+--------------------+-----+--------+--------+-------+---+------------+-----------+-----------+
|gs://dataproc-tem...| 46.9|            0.34463|             0.22026|0.469| 0.07856|-0.02298| 0.1801|1.0|         0.0|     3000.0|        0.0|
|gs://dataproc-tem...|888.2|            6.43217|  3.9823399999999998|8.882| 1.27013|-0.72832|3.26858|1.0|       10.98|      910.9|        0.0|
|gs://dataproc-tem...| 25.0|            0.16691| 0.08381999999999999| 0.25|0.041875|-0.01225|  0.096|1.0|         0.0|     3000.0|        0.0|
|gs://dataproc-tem...| 25.0|            0.13858|0.027160000000000004| 0.25|0.041875|-0.01225|  0.096|1.0|         0.0|     3000.0|        0.0|

I will try to aggregates generator features at the snapshort level, summarizing key characteristics for each filename.
  
The following statistics will be extracted and computed:
- Total number of generators
- Sum of generated power (pg)
- Average generator voltage (vg)
- Average generator cost parameters (quadratic, linear, and offset terms)

In [11]:
# Aggregate generator features by snapshot (filename)
gen_agg = gen_df.groupBy("filename").agg(
    count("*").alias("num_generators"),
    spark_sum("pg").alias("total_pg"),
    avg("vg").alias("avg_vg"),
    avg("cost_squared").alias("avg_cost_squared"),
    avg("cost_linear").alias("avg_cost_linear"),
    avg("cost_offset").alias("avg_cost_offset")
)

The next step would try to process and aggregate the power load data, extracting key load parameters (specifically real "pd" and reactive "qd" power demands) per snapshot.

In [12]:
# Extracting load data
load_df = df.select(
    "filename",
    col("grid.nodes.load").alias("load_arrays")  
).select(
    "filename",
    explode("load_arrays").alias("load")  
).select(
    "filename",
    col("load")[0].alias("pd"),  
    col("load")[1].alias("qd")
)

load_df.show(5)


ERROR:root:Exception while sending command.                         (0 + 1) / 1]
Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3579, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_27463/2439983976.py", line 14, in <module>
    load_df.show(5)
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 947, in show
    print(self._show_string(n, truncate, vertical))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 965, in _show_string
    return self._jdf.showString(n, 20, vertical)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/usr/lib/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
 

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
# Aggregate load features per snapshot
load_agg = load_df.groupBy("filename").agg(
    count("*").alias("num_loads"),
    spark_sum("pd").alias("total_pd")
)

Similarly, the next step would try to extract and aggregate transmission line features.

In [None]:
# Extract transmission line features from grid.edges.ac_line.features
ac_line_df = df.select(
    "filename",
    explode("grid.edges.ac_line.features").alias("ac_line")
).select(
    "filename",
    col("ac_line")[2].alias("br_r"),
    col("ac_line")[3].alias("br_x"),
    col("ac_line")[4].alias("rate_a"),
    col("ac_line")[5].alias("rate_b"),
    col("ac_line")[6].alias("rate_c")
)

ac_line_df.show(5)

In [None]:
# Aggregate transformer features per snapshot
ac_line_agg = ac_line_df.groupBy("filename").agg(
    avg("br_r").alias("br_r_mean"),
    avg("br_x").alias("br_x_mean"),
    spark_sum("rate_a").alias("rate_a_sum"),
    F.min("rate_b").alias("rate_b_min"),
    F.max("rate_c").alias("rate_c_max")
)

Similarly, extract and aggregate the Transformer features

In [None]:
# Extract transformer features from grid.edges.transformer.features
trans_df = df.select(
    "filename",
    explode("grid.edges.transformer.features").alias("trans")
).select(
    "filename",
    col("trans")[2].alias("br_r"),
    col("trans")[3].alias("br_x"),
    col("trans")[4].alias("rate_a"),
    col("trans")[7].alias("tap")
)

trans_df.show(5)

In [None]:
# Aggregate transformer features per snapshot
trans_agg = trans_df.groupBy("filename").agg(
    avg("br_r").alias("trans_br_r_mean"),
    avg("br_x").alias("trans_br_x_mean"),
    spark_sum("rate_a").alias("trans_rate_a_sum"),
    avg("tap").alias("tap_mean")
)

Similarly, extract the objetive value (total cost).

In [None]:
# Extract the objective value directly from metadata
obj_df = df.select(
    "filename",
    col("metadata.`objective`").alias("total_cost") 
)


obj_df.show(5)

Then, we will merge aggregated data from different sources and ensures data consistency by checking for duplicate entries. Here are the main steps:
  
- Joining aggregated data from generator, load, transmission line, transformer and objective value based on "filename" into a single DataFrame.
- Validating the aggregated results
- Checking for duplicate filenames

In [None]:
# Use inner joins on 'filename' (the snapshot key) to combine all real values
agg_df = gen_agg.join(load_agg, "filename", "inner") \
                .join(ac_line_agg, "filename", "inner") \
                .join(trans_agg, "filename", "inner") \
                .join(obj_df, "filename", "inner")

# Validate the result
print("Total number of rows after aggregation:", agg_df.count())
agg_df.show(5, truncate=False)  # Display the first 5 rows without truncation

# Check for duplicate filenames
from pyspark.sql import functions as F

duplicate_check = agg_df.groupBy("filename").agg(
    F.count("*").alias("count")
).filter(F.col("count") > 1)

if duplicate_check.count() == 0:
    print("All filenames are unique")
else:
    print("Duplicate filenames exist! Data consistency needs to be checked")
