# **Dataset Integration**

### Load data into Spark

In [1]:
import sys
sys.path.append("/usr/local/python-env/py39/lib/python3.9/site-packages")

import pyspark
print(pyspark.__version__)

print(sys.executable)

3.5.1
/usr/bin/python3.9


### Initialze a SparkSession

Ensuring the pyspark library is being accessed from my local usr directory.


In [2]:
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.9'

In [3]:
import pkg_resources

sedona_version = pkg_resources.get_distribution("apache-sedona").version
print(f"Apache Sedona version: {sedona_version}")


Apache Sedona version: 1.5.1


In [4]:
print(os.environ['SPARK_HOME'])
print(os.environ['PYSPARK_PYTHON'])

/usr/local/spark/latest
/usr/bin/python3.9


In [5]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, lit,  split, expr, concat, when, explode
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import year 
from pyspark.sql import Window
from pyspark.sql.functions import sum as pyspark_sum
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
import geopandas as gpd

Skipping SedonaKepler import, verify if keplergl is installed


## Now to make the app

In [6]:
spark = SparkSession \
    .builder \
    .appName('GeoSpatialQueries_Freddy') \
    .master('spark://columbus-oh.cs.colostate.edu:30800') \
    .config("spark.yarn.resourcemanager.address", "columbia.cs.colostate.edu:30799") \
    .config("spark.executor.memory", "3g") \
    .config("spark.executor.memoryOverhead", "512m") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "500m") \
    .config("spark.serializer", KryoSerializer.getName) \
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName) \
    .config('spark.jars.packages',
            'org.apache.sedona:sedona-spark-3.5_2.12:1.5.1,'
            'org.datasyslab:geotools-wrapper:1.5.1-28.2') \
    .config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all') \
    .getOrCreate()

# Set log level to DEBUG
spark.sparkContext.setLogLevel("ERROR")

sedona = SedonaContext.create(spark)
SedonaRegistrator.registerAll(spark)

# create a logger
logger = spark._jvm.org.apache.log4j.LogManager.getLogger(__name__)
logger.info("Pyspark initialized...")

https://artifacts.unidata.ucar.edu/repository/unidata-all added as a remote repository with the name: repo-1
Ivy Default Cache set to: /s/chopin/a/grad/flarrieu/.ivy2/cache
The jars for the packages stored in: /s/chopin/a/grad/flarrieu/.ivy2/jars
org.apache.sedona#sedona-spark-3.5_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ce28daa3-3029-4a04-ac2d-9eb62b97621d;1.0
	confs: [default]
	found org.apache.sedona#sedona-spark-3.5_2.12;1.5.1 in central


:: loading settings :: url = jar:file:/usr/local/spark/3.5.0-with-hadoop3.3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.sedona#sedona-common;1.5.1 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found org.locationtech.jts#jts-core;1.19.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found org.locationtech.spatial4j#spatial4j;0.8 in central
	found com.google.geometry#s2-geometry;2.0.0 in central
	found com.google.guava#guava;25.1-jre in central
	found com.google.code.findbugs#jsr305;3.0.2 in user-list
	found org.checkerframework#checker-qual;2.0.0 in central
	found com.google.errorprone#error_prone_annotations;2.1.3 in central
	found com.google.j2objc#j2objc-annotations;1.1 in central
	found org.codehaus.mojo#animal-sniffer-annotations;1.14 in central
	found com.uber#h3;4.1.1 in central
	found net.sf.geographiclib#GeographicLib-Java;1.52 in central
	found com.github.ben-manes.caffeine#caffeine;2.9.2 in central
	found org.checkerframework#checker-qual;3.10.0 in central
	found com.google.errorprone#error_prone_annotations;2.5.1 in central
	found org.apac

# **Helper Functions**

In [7]:
# Import the necessary module from py4j to interact with JVM
from py4j.java_gateway import java_import

# Import the Path class from Hadoop. This class is used to handle file paths in Hadoop.
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

def get_files_recursive(path):
    file_status_arr = fs.listStatus(spark._jvm.Path(path))
    file_paths = []
    
    for file_status in file_status_arr:
        if file_status.isDirectory():
            file_paths += get_files_recursive(file_status.getPath().toString())
        elif file_status.getPath().getName().endswith(('.geojson')):
            file_paths.append(file_status.getPath().toString())
    
    print(file_paths)
    return file_paths

In [8]:
def create_csv(df: DataFrame, path: str):
    df.write.csv(path=path, mode='append', header=True)

In [9]:
def append_to_csv(df: DataFrame, path: str):
    df.write.csv(path=path, mode='append', header=True)

In [10]:
def get_file_to_df(file_name: str): 
    data_directory = "hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/"
    print(f"Processing file: {file_name}")

    geojson_schema =  "type string, crs string, totalFeatures long, features array<struct<type string, geometry string, properties map<string, string>>>"

    df = spark.read.schema(geojson_schema).json(data_directory + file_name, multiLine=True)
    
    df = (df
        .select(explode("features").alias("features"))
        .select("features.*")
        .withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)"))
    )
    
    return df

In [11]:
def create_edge_df(df_parent: DataFrame, subject, relationship, object):
    df_edges = df_parent.withColumn("Subject", subject) \
        .withColumn("Relationship", relationship) \
        .withColumn("Object", object) \
        .select("Subject", "Relationship", "Object")
    return df_edges

In [12]:
def create_node_df(df_edges: DataFrame, type):
    nodes_subject = df_edges.select(col("Subject").alias("Node_ID")).distinct()
    nodes_object = df_edges.select(col("Object").alias("Node_ID")).distinct()

    nodes_df = nodes_subject.union(nodes_object).distinct()

    nodes_df = nodes_df.withColumn("Type", type)
    
    return nodes_df

In [13]:
def create_relationships_point(df_parent: DataFrame, df_child: DataFrame, ):
    df_parent = df_parent.withColumnRenamed("geometry", "parent_geometry")
    df_child = df_child.withColumnRenamed("geometry", "child_geometry")
    df_parent = df_parent.withColumnRenamed("properties", "parent_properties")
    df_child = df_child.withColumnRenamed("properties", "child_properties")

    df_parent.createOrReplaceTempView("parents")
    df_child.createOrReplaceTempView("children")

    df_country_continent_contains = spark.sql("""
        SELECT *
        FROM parents, children
        WHERE ST_Contains(parent_geometry, ST_Centroid(child_geometry))
    """)

    return df_country_continent_contains.select("parent_geometry", "child_geometry", "parent_properties", "child_properties")

In [14]:
def create_relationships_geometry(df_parent: DataFrame, df_child: DataFrame, ):
    df_parent = df_parent.withColumnRenamed("geometry", "parent_geometry")
    df_child = df_child.withColumnRenamed("geometry", "child_geometry")
    df_parent = df_parent.withColumnRenamed("properties", "parent_properties")
    df_child = df_child.withColumnRenamed("properties", "child_properties")

    df_parent.createOrReplaceTempView("parents")
    df_child.createOrReplaceTempView("children")

    df_country_continent_contains = spark.sql("""
        SELECT 
            p.parent_geometry, 
            c.child_geometry, 
            p.parent_properties, 
            c.child_properties,
            CASE 
                WHEN ST_Contains(p.parent_geometry, ST_Centroid(c.child_geometry)) THEN 'Contains'
                WHEN ST_DWithin(p.parent_geometry, ST_Centroid(c.child_geometry), 100) THEN 'Within 100m'
                WHEN ST_Touches(p.parent_geometry, c.child_geometry) THEN 'Touches'
                WHEN ST_Intersects(p.parent_geometry, c.child_geometry) THEN 'Intersects'
            END AS relationship_type
        FROM parents p, children c
        WHERE ST_Contains(p.parent_geometry, ST_Centroid(c.child_geometry))
        OR ST_DWithin(p.parent_geometry, ST_Centroid(c.child_geometry), 100)
        OR ST_Touches(p.parent_geometry, c.child_geometry)
        OR ST_Intersects(p.parent_geometry, c.child_geometry)
    """)

    return df_country_continent_contains.select("parent_geometry", "child_geometry", "parent_properties", "child_properties")

In [15]:
from pyspark.sql import DataFrame
from functools import reduce

def union_all(dfs_dict):
    dfs = list(dfs_dict.values())
    if dfs:
        return reduce(DataFrame.unionByName, dfs)
    else:
        return None


In [82]:
def load_resolution(resolution_directory: str):
    resolution_dataframes = {}
    files_directory = f"hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/{resolution_directory}/"

    files = get_files_recursive(files_directory)

    for file_path in files:
        file_name = file_path.split('/')[-1]
        
        print(f"Processing file: {file_path}")

        geojsonSchema = "type string, crs string, totalFeatures long, features array<struct<type string, geometry string, properties map<string, string>>>"
        
        df = spark.read.schema(geojsonSchema).json(file_path, multiLine=True)
        
        df = (df
            .select(F.explode("features").alias("features"))
            .select("features.*")
            .withColumn("geometry", F.expr("ST_GeomFromGeoJSON(geometry)"))
            )
        
        resolution_dataframes[file_name] = df
    return resolution_dataframes

In [90]:
def integrate_at_resolution(df_dataset: DataFrame, dataset:str, resolutions_dir: str, resolution_prefix: str, dataset_unique_id: str, resolution_unique_id: str, verbose=False):
    resolutions_dataframes = load_resolution(resolutions_dir)
    dataset_prefix = lit(dataset + "_")
    final_resolution_edges = {}
    final_resolution_nodes = {}

    for resolutions_file_name, df_resolutions in resolutions_dataframes.items():
        resolutions = resolutions_file_name.replace('.geojson', '')
        print(resolutions)

        df_resolutions_contain_dataset = create_relationships_point(df_resolutions, df_dataset)
        
        df_dataset_isPartOf_resolution_edges = create_edge_df(df_resolutions_contain_dataset, concat(lit(dataset_prefix), col(dataset_unique_id), lit("isPartOf"), concat(lit(resolution_prefix), col(resolution_unique_id)))
        if verbose: df_dataset_isPartOf_resolution_edges.show(n=1, truncate=False)
        final_resolution_edges[f'DatasetIsPartOf{resolutions}Resolutions'] = df_dataset_isPartOf_resolution_edges

        df_resolution_contains_dataset_edges = create_edge_df(df_resolutions_contain_dataset, concat(lit(resolution_prefix), col(resolution_unique_id)), lit("Contains"), concat(lit(dataset_prefix), col(dataset_unique_id)))
        if verbose: df_resolution_contains_dataset_edges.show(n=1, truncate=False)
        final_resolution_edges[f'{resolutions}ResolutionContainsDataset'] = df_resolution_contains_dataset_edges

        df_nodes = create_node_df(df_dataset_isPartOf_resolution_edges, lit(dataset))
        df_nodes = df_nodes.filter(~col("Node_ID").contains(f"{resolution_prefix}_"))
        final_resolution_nodes[f'{resolutions}Resolutions'] = df_nodes
        if verbose: df_nodes.show(truncate=False)
    pass

In [88]:
def integrate_dataset(df_dataset: DataFrame, dataset: str, resolution: str, dataset_unique_id: str, verbose=False):
    if resolution == 'County':
        pass
    elif resolution == 'Tract':
        pass
    elif resolution == 'BlockGroup':
        return integrate_at_resolution(df_dataset, dataset, "BlocksByState", "BlockGroup_", dataset_unique_id, "parent_properties.GEOID", verbose)
    else:
        print(resolution + " isn't a resolution try: County, Tract, BlockGroup")

    pass

# Global Variables

In [77]:
base_edges_path = "hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/graph/BaseEdges.csv"
base_nodes_path = "hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/graph/BaseNodes.csv"

# **Get Blocks (Leaf Nodes)**

In [16]:
blocks_dataframes = {}
files_directory = "hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/"

files = get_files_recursive(files_directory)

for file_path in files:
    file_name = file_path.split('/')[-1]
    
    print(f"Processing file: {file_path}")

    geojsonSchema = "type string, crs string, totalFeatures long, features array<struct<type string, geometry string, properties map<string, string>>>"
    
    df = spark.read.schema(geojsonSchema).json(file_path, multiLine=True)
    
    df = (df
        .select(F.explode("features").alias("features"))
        .select("features.*")
        .withColumn("geometry", F.expr("ST_GeomFromGeoJSON(geometry)"))
        )
    
    blocks_dataframes[file_name] = df

['hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Alabama.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Alaska.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/AmericanSamoa.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Arizona.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Arkansas.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/California.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Colorado.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/CommonwealthoftheNorthernMarianaIslands.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Connecticut.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Delaware.geojson', 'hdfs://columbus-oh.cs.colostate

# Get County Nodes

In [17]:
counties_dataframes = {}
files_directory = "hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/"
files = get_files_recursive(files_directory)

for file_path in files:
    file_name = file_path.split('/')[-1]
    
    # Print the file path
    print(f"Processing file: {file_name}")

    geojsonSchema = "type string, crs string, totalFeatures long, features array<struct<type string, geometry string, properties map<string, string>>>"

    df = spark.read.schema(geojsonSchema).json(file_path, multiLine=True)
    
    df = (df
        .select(F.explode("features").alias("features"))
        .select("features.*")
        .withColumn("geometry", F.expr("ST_GeomFromGeoJSON(geometry)"))
        )
    
    counties_dataframes[file_name] = df

['hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/AlabamaCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/AlaskaCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/AmericanSamoaCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/ArizonaCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/ArkansasCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/CaliforniaCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/ColoradoCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/CommonwealthoftheNorthernMarianaIslandsCounties.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/CountiesByState/ConnecticutCounties.geojson', 'hdfs://columbus-oh.cs.colostate.ed

# Get Tract Nodes

In [18]:
tractss_dataframes = {}
files_directory = "hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/"
files = get_files_recursive(files_directory)

for file_path in files:
    file_name = file_path.split('/')[-1]
    
    # Print the file path
    print(f"Processing file: {file_name}")

    geojsonSchema = "type string, crs string, totalFeatures long, features array<struct<type string, geometry string, properties map<string, string>>>"

    df = spark.read.schema(geojsonSchema).json(file_path, multiLine=True)
    
    df = (df
        .select(F.explode("features").alias("features"))
        .select("features.*")
        .withColumn("geometry", F.expr("ST_GeomFromGeoJSON(geometry)"))
        )
    
    counties_dataframes[file_name] = df

['hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/AlabamaTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/AlaskaTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/AmericanSamoaTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/ArizonaTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/ArkansasTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/CaliforniaTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/ColoradoTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/CommonwealthoftheNorthernMarianaIslandsTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsByState/ConnecticutTracts.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/TractsBySta

In [66]:
def create_dataset_nodes(df_dataset: DataFrame, unique_identifier:str , dataset: str, column_mappings: dict):
    # Generate Node_ID using a prefix from the dataset name and a unique identifier
    dataset_prefix = lit(dataset + "_")
    df_nodes = df_dataset.withColumn("Node_ID", concat(dataset_prefix, col("properties.NAME")))
    
    # Map all specified columns
    for new_col, old_col in column_mappings.items():
        if new_col == 'Type':
            # If the column is 'Type', use a literal value instead of a column reference
            df_nodes = df_nodes.withColumn(new_col, lit(old_col))
        elif new_col != "Node_ID":  # Node_ID is already handled
            df_nodes = df_nodes.withColumn(new_col, col(old_col))
    
    # Handle geometry separately, converting it to GeoJSON (assuming the geometry column is named 'geometry')
    if "geometry" in df_dataset.columns:
        df_nodes = df_nodes.withColumn("geometry", expr("ST_AsGeoJSON(geometry)"))
    
    # # Drop all original columns except those transformed to new ones
    df_nodes = df_nodes.drop("properties")

    # Display the first few rows of the resultant DataFrame
    df_nodes.show(5, truncate=False)
    
    return df_nodes

# GeneralManufacturingFacilities

In [78]:
dataset = 'GeneralManufacturingFacilities'
dataset_nodes_path = f"hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/dataset_graphs/{dataset}Nodes.csv"
dataset_edges_path = f"hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/dataset_graphs/{dataset}Edges.csv"

In [79]:
df_dataset = get_file_to_df(f'{dataset}.geojson')
df_dataset.show(n=1, truncate=False)

Processing file: GeneralManufacturingFacilities.geojson


[Stage 37:>                                                         (0 + 1) / 1]

+-------+------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|type   |geometry                      |properties                                                                                                                                                                                                                                                               

                                                                                

In [80]:

# "properties": {
#     "OBJECTID": 3,
#     "UNIQUE_ID": "N/A",
#     "NAME": "JWS REFRIGERATION & AIR CONDITIONING",
#     "PHONE": "(671) 646-7662",
#     "FAX": "NOT AVAILABLE",
#     "ADDRESS": "290 TUN JOSE SALAS STREET",
#     "ADDRESS2": "SUITE A",
#     "CITY": "TAMUNING",
#     "STATE": "GU",
#     "ZIP": "96913",
#     "ZIP4": "N/A",
#     "COUNTY": "GUAM",
#     "FIPS": "66010",
#     "MADDRESS": "290 TUN JOSE SALAS STREET",
#     "MCITY": "TAMUNING",
#     "MSTATE": "GU",
#     "MZIP": "96913",
#     "MZIP4": "N/A",
#     "DIRECTIONS": "NOT AVAILABLE",
#     "GEOPREC": "BLOCKFACE",
#     "EMP": 0,
#     "PRODUCT": "REFRIGERATION AND AIR-CONDITIONING",
#     "SIC": "3585",
#     "SIC2": "2542",
#     "SIC3": "N/A",
#     "SIC4": "N/A",
#     "NAICS": "N/A",
#     "NAICSDESCR": "NOT AVAILABLE",
#     "WEB": "WWW.JWSGUAM.COM",
#     "LONGITUDE": 144.7883065,
#     "LATITUDE": 13.4912295
# }


general_mfg_mappings = {
    "Type": dataset,
    "Name": "properties.NAME",
    "Phone": "properties.PHONE",
    "Fax": "properties.FAX",
    "Address": "properties.ADDRESS",
    "City": "properties.CITY",
    "State": "properties.STATE",
    "Zip": "properties.ZIP",
    "County": "properties.COUNTY"
}

df_nodes = create_dataset_nodes(df_dataset, "properties.NAME", dataset, general_mfg_mappings)

24/04/24 11:08:32 ERROR TaskSchedulerImpl: Lost executor 12 on 129.82.44.141: Command exited with code 52


+------------------------------+-------------------------------------------------------------+-------------------------------------------------------------------+------------------------------------+--------------+--------------+-----------------------------+--------+-----+-----+-----------------+
|Type                          |geometry                                                     |Node_ID                                                            |Name                                |Phone         |Fax           |Address                      |City    |State|Zip  |County           |
+------------------------------+-------------------------------------------------------------+-------------------------------------------------------------------+------------------------------------+--------------+--------------+-----------------------------+--------+-----+-----+-----------------+
|GeneralManufacturingFacilities|{"type":"Point","coordinates":[144.7883065,13.4912295]}      |GeneralMa

                                                                                

In [None]:
create_csv(df_nodes, dataset_nodes_path)

In [91]:
integrate_dataset(df_dataset, dataset, "BlockGroup", "properties.NAME", verbose=True)

['hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Alabama.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Alaska.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/AmericanSamoa.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Arizona.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Arkansas.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/California.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Colorado.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/CommonwealthoftheNorthernMarianaIslands.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Connecticut.geojson', 'hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/input/BlocksByState/Delaware.geojson', 'hdfs://columbus-oh.cs.colostate

PySparkTypeError: [NOT_ITERABLE] Column is not iterable.

In [None]:
block_prefix = "BlockGroup_"

final_block_edges = {}
final_block_nodes = {}

for blocks_file_name, df_blocks in blocks_dataframes.items():
    blocks = blocks_file_name.replace('.geojson', '')
    print(blocks)
    df_blocks = blocks_dataframes[blocks_file_name]

    df_blocks_contain_dataset = create_relationships_point(df_blocks, df_dataset)
    
    df_dataset_isPartOf_block_edges = create_edge_df(df_blocks_contain_dataset, concat(lit(dataset_prefix), col("child_properties.NAME")), lit("isPartOf"), concat(lit(block_prefix), col("parent_properties.GEOID")))
    # df_dataset_isPartOf_block_edges.show(n=1, truncate=False)
    final_block_edges[f'DatasetIsPartOf{blocks}Blocks'] = df_dataset_isPartOf_block_edges

    df_block_contains_dataset_edges = create_edge_df(df_blocks_contain_dataset, concat(lit(block_prefix), col("parent_properties.GEOID")), lit("Contains"), concat(lit(dataset_prefix), col("child_properties.NAME")))
    # df_block_contains_dataset_edges.show(n=1, truncate=False)
    final_block_edges[f'{blocks}BlockContainsDataset'] = df_block_contains_dataset_edges

    df_nodes = create_node_df(df_dataset_isPartOf_block_edges, lit(dataset))
    df_nodes = df_nodes.filter(~col("Node_ID").contains(f"{block_prefix}_"))
    final_block_nodes[f'{blocks}Blocks'] = df_nodes
    # df_nodes.show(truncate=False)

In [None]:
combined_block_edges_df = union_all(final_block_edges)
combined_block_nodes_df = union_all(final_block_nodes)

In [None]:
append_to_csv(combined_block_nodes_df, base_nodes_path) # Append to Base Graph
append_to_csv(combined_block_edges_df, base_edges_path) # Append to Base Graph
create_csv(combined_block_edges_df, dataset_nodes_path) # Create Dataset Edges File

# HydroCarbonGasLiquidPipelines (Blocks)

In [None]:
dataset = 'HydroCarbonGasLiquidPipelines'

dataset_nodes_path = f"hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/dataset_graphs/{dataset}Nodes.csv"
dataset_edges_path = f"hdfs://columbus-oh.cs.colostate.edu:30785/geospatial/dataset_graphs/{dataset}Edges.csv"

In [None]:
df_dataset = get_file_to_df(f'{dataset}.geojson')
df_dataset.show(n=1, truncate=False)

In [None]:
# "properties": {
#     "FID": 28,
#     "Opername": "ENERGY TRANSFER",
#     "Pipename": "West Texas Pipeline",
#     "Shape_Leng": 9.96270805248,
#     "Shape__Length": 1148322.03802495
# }

dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df_dataset.withColumn("Node_ID", concat(dataset_prefix, col("properties.Pipename"))) \
                 .withColumn("Type", lit("HydroCarbonGasLiquidPipelines")) \
                 .withColumn("Opername", col("properties.Opername")) \
                 .withColumn("FID", col("properties.FID")) 

df_nodes = df_nodes.withColumn("geometry", expr("ST_AsGeoJSON(geometry)"))

df_nodes = df_nodes.drop("properties")

df_nodes.show(n=5, truncate=False)

In [None]:
create_csv(df_nodes, dataset_nodes_path)

In [None]:
block_prefix = "BlockGroup_"

final_block_edges = {}
final_block_nodes = {}

for blocks_file_name, df_blocks in blocks_dataframes.items():
    blocks = blocks_file_name.replace('.geojson', '')
    print(blocks)
    df_blocks_contain_dataset = create_relationships_geometry(df_blocks, df_dataset)
    
    df_dataset_isPartOf_block_edges = create_edge_df(df_blocks_contain_dataset, concat(lit(dataset_prefix), col("child_properties.Pipename")), lit("isPartOf"), concat(lit(block_prefix), col("parent_properties.GEOID")))
    # df_dataset_isPartOf_block_edges.show(n=1, truncate=False)
    final_block_edges[f'DatasetIsPartOf{blocks}Blocks'] = df_dataset_isPartOf_block_edges

    df_block_contains_dataset_edges = create_edge_df(df_blocks_contain_dataset, concat(lit(block_prefix), col("parent_properties.GEOID")), lit("Contains"), concat(lit(dataset_prefix), col("child_properties.Pipename")))
    # df_block_contains_dataset_edges.show(n=1, truncate=False)
    final_block_edges[f'{blocks}BlockContainsDataset'] = df_block_contains_dataset_edges

    df_nodes = create_node_df(df_block_contains_dataset_edges, lit(dataset))
    df_nodes = df_nodes.filter(~col("Node_ID").contains(f"{dataset_prefix}_"))
    final_block_nodes[f'{blocks}Blocks'] = df_nodes
    # df_nodes.show(truncate=False)


In [None]:
combined_block_edges_df = union_all(final_block_edges)
combined_block_nodes_df = union_all(final_block_nodes)

In [None]:
combined_block_edges_df.count()

In [None]:
combined_block_nodes_df.count()

In [None]:
append_to_csv(combined_block_nodes_df, base_nodes_path) # Append to Base Graph
append_to_csv(combined_block_edges_df, base_edges_path) # Append to Base Graph
create_csv(combined_block_edges_df, dataset_nodes_path) # Create Dataset Edges File

# HydroCarbonGasLiquidPipelines (Counties)

In [None]:
county_prefix = "County_"

final_county_edges = {}
final_county_nodes = {}

for counties_file_name, df_counties in counties_dataframes.items():
    counties = counties_file_name.replace('.geojson', '')
    print(counties)

    df_counties_contain_dataset = create_relationships_geometry(df_counties, df_dataset)
    
    df_dataset_isPartOf_counties_edges = create_edge_df(df_counties_contain_dataset, concat(lit(dataset_prefix), col("child_properties.Pipename")), lit("isPartOf"), concat(lit(county_prefix), col("parent_properties.NAME")))
    df_dataset_isPartOf_counties_edges.show(n=1, truncate=False)
    final_county_edges[f'DatasetIsPartOf{counties}Counties'] = df_dataset_isPartOf_counties_edges

    df_county_contains_dataset_edges = create_edge_df(df_counties_contain_dataset, concat(lit(county_prefix), col("parent_properties.NAME")), lit("Contains"), concat(lit(dataset_prefix), col("child_properties.Pipename")))
    df_county_contains_dataset_edges.show(n=1, truncate=False)
    final_county_edges[f'{counties}CountiesContainsDataset'] = df_county_contains_dataset_edges

    df_nodes = create_node_df(df_county_contains_dataset_edges, lit(dataset))
    df_nodes = df_nodes.filter(~col("Node_ID").contains(f"{dataset_prefix}_"))
    final_county_nodes[f'{blocks}Blocks'] = df_nodes
    df_nodes.show(truncate=False)
    break

In [None]:
combined_county_edges_df = union_all(final_county_edges)
combined_county_nodes_df = union_all(final_county_nodes)

In [None]:
combined_county_edges_df.count()

In [None]:
combined_county_nodes_df.count()

In [None]:
append_to_csv(combined_county_nodes_df, base_nodes_path) # Append to Base Graph
append_to_csv(combined_county_edges_df, base_edges_path) # Append to Base Graph
create_csv(combined_county_edges_df, dataset_nodes_path) # Create Dataset Edges File

# Oil Refineries

In [None]:
dataset = 'OilRefineries'

In [None]:
dataset = 'OilRefineries'

# "properties": {
#     "OBJECTID": 18,
#     "REF_ID": "REF220020",
#     "NAME": "LAKE CHARLES",
#     "ADDRESS": "1601 HWY 108 E",
#     "CITY": "SULPHUR",
#     "STATE": "LA",
#     "ZIP": "70665",
#     "ZIP4": "NOT AVAILABLE",
#     "TELEPHONE": "(337) 708-8431",
#     "TYPE": "MODERN DEEP-CONVERSION FACILITY",
#     "STATUS": "IN SERVICE",
#     "POPULATION": 1183,
#     "COUNTY": "CALCASIEU",
#     "COUNTYFIPS": "22019",
#     "COUNTRY": "USA",
#     "LATITUDE": 30.17866697000005,
#     "LONGITUDE": -93.33023517799995,
#     "NAICS_CODE": "324110",
#     "NAICS_DESC": "PETROLEUM REFINERIES",
#     "SOURCE": "EIA-820; EPA TRI",
#     "SOURCEDATE": "2017/01/01 00:00:00",
#     "VAL_METHOD": "IMAGERY/OTHER",
#     "VAL_DATE": "2018/01/31 00:00:00",
#     "WEBSITE": "http://citgorefining.com",
#     "OWNER": "PDV AMERICA INC",
#     "OPERNAME": "CITGO PETROLEUM CORP",
#     "RMP_ID": "55717",
#     "EPA_ID": "100000140199",
#     "POSREL": "WITHIN 166 FEET",
#     "CAPACITY": 425000,
#     "US_RANK": 6,
#     "CRUDE": 425000,
#     "VACDIST": 230000,
#     "COKING": 85410,
#     "THERMALOP": 0,
#     "CATCRACK": 143000,
#     "CATREFORM": 103035,
#     "CATHYDCRCK": 46000,
#     "CATHYDTRT": 398200,
#     "ALKY": 26400,
#     "POLDIM": 0,
#     "AROMATIC": 20900,
#     "ISOMER": 28000,
#     "LUBES": 0,
#     "OXYGENATES": 0,
#     "HYDRGN": 0,
#     "COKE": 32820,
#     "SULFUR": 717,
#     "ASPHALT": 0
# }


dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df.withColumn("Node_ID", concat(dataset_prefix, col("properties.NAME"))) \
                 .withColumn("Type", lit("OilRefineries")) \
                 .withColumn("CAPACITY", col("properties.CAPACITY")) 


# NaturalGasStorageFacilities

In [None]:
dataset = 'NaturalGasStorageFacilities'


In [None]:

# "properties": {
#     "FID": 12,
#     "STFID": "STF190012",
#     "NAME": "COLUMBUS CITY",
#     "ADDRESS": "120TH STREET/S AVE",
#     "CITY": "COLUMBUS CITY",
#     "STATE": "IA",
#     "ZIP": "52738",
#     "ZIP4": "NOT AVAILABLE",
#     "TELEPHONE": "NOT AVAILABLE",
#     "TYPE": "AQUIFER",
#     "STATUS": "ACTIVE",
#     "POPULATION": -999,
#     "COUNTY": "LOUISA",
#     "COUNTYFIPS": "19115",
#     "COUNTRY": "USA",
#     "LATITUDE": 41.234864,
#     "LONGITUDE": -91.350155,
#     "NAICS_CODE": "486210",
#     "NAICS_DESC": "STORAGE OF NATURAL GAS",
#     "SOURCE": "EIA, IMAGERY",
#     "SOURCEDATE": "2018/12/01 00:00:00",
#     "VAL_METHOD": "IMAGERY/OTHER",
#     "VAL_DATE": "2019/04/10 00:00:00",
#     "WEBSITE": "http://www.kindermorgan.com/",
#     "EPAID": "155321",
#     "OWNER": "KINDER MORGAN (NATURAL GAS PIPELINE CO OF AMERICA)",
#     "OPERATOR": "NATURAL GAS PIPELINE CO OF AMERICA",
#     "POSREL": "EXCEEDS 1 MILE",
#     "OWNERPCT": 100,
#     "MAXDEL": 175000,
#     "WORKCAP": 16685000,
#     "BASEGAS": 37700000,
#     "TOTALCAP": 54400193,
#     "REGION": "MIDWEST REGION",
#     "PROPMAX": -999,
#     "PROPWORK": -999,
#     "PROPTOTAL": -999,
#     "RESERVNAME": "GALESVILLE MT. SIMON ST. PETER",
#     "SEC_NAICS": "NOT APPLICABLE",
#     "SEC_N_DESC": "NOT APPLICABLE"
# }


dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df.withColumn("Node_ID", concat(dataset_prefix, col("properties.NAME"))) \
                 .withColumn("Type", lit("NaturalGasStorageFacilities")) \
                 .withColumn("Total Capacity", col("properties.TOTALCAP")) 


# NaturalGasProcessingPlants

In [None]:
dataset = 'NaturalGasProcessingPlants'

In [None]:
# "properties": {
#     "OBJECTID": 5,
#     "NGPPID": "NGPP010177",
#     "NAME": "DOGWOOD OAKS PLANT",
#     "ADDRESS": "21680 HWY 41",
#     "CITY": "BREWTON",
#     "STATE": "AL",
#     "ZIP": "36426",
#     "ZIP4": "NOT AVAILABLE",
#     "TELEPHONE": "(251) 248-2903",
#     "TYPE": "NATURAL GAS PROCESSING PLANT",
#     "STATUS": "ACTIVE",
#     "POPULATION": 12,
#     "COUNTY": "ESCAMBIA",
#     "COUNTYFIPS": "01053",
#     "COUNTRY": "USA",
#     "LATITUDE": 31.243471,
#     "LONGITUDE": -87.187836,
#     "NAICS_CODE": "211130",
#     "NAICS_DESC": "NATURAL GAS EXTRACTION",
#     "SOURCE": "EPA RISK MANAGEMENT PLAN (RMP) - THE RIGHT-TO-KNOW NETWORK",
#     "SOURCEDATE": "2013/03/22 00:00:00",
#     "VAL_METHOD": "IMAGERY/OTHER",
#     "VAL_DATE": "2015/06/17 00:00:00",
#     "WEBSITE": "www.plainsallamerican.com/",
#     "FACID": "100000218356",
#     "COMPNAME": "PLAINS GAS SOLUTIONS, LLC",
#     "POSREL": "WITHIN 40 FEET",
#     "OPERATOR": "CDM MAX, L.L.C. (PLAINS GAS SOLUTIONS, LLC)",
#     "OPERADDR": "333 CLAY STREET, SUITE 1600",
#     "OPERCITY": "HOUSTON",
#     "OPERSTATE": "TX",
#     "OPERCNTRY": "USA",
#     "OPERZIP": "77002",
#     "OPERPHONE": "(251) 248-2903",
#     "OPERURL": "www.plainsallamerican.com/about-us/subsidiary-websites/plains-gas-solutions/facilities",
#     "GASCAP": 4,
#     "PROCAMTBLS": 186840,
#     "BASIN": "GULF COAST COAL REGION",
#     "PLANTFLOW": 4,
#     "BTUCONTENT": 1000,
#     "GASSTORCAP": -999,
#     "LIQSTORCAP": 1000,
#     "RMP_ID": "1000032802",
#     "EPA_ID": "110055375883"
# }


dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df.withColumn("Node_ID", concat(dataset_prefix, col("properties.NAME"))) \
                 .withColumn("Type", lit("NaturalGasProcessingPlants")) \
                 .withColumn("Plant Flow", col("properties.PLANTFLOW")) 


# NaturalGasCompressorStations

In [None]:
dataset = 'GeographicRegions'

In [None]:
# "properties": { 
#     "scalerank": 7.0, 
#     "featurecla": "Island", 
#     "name": "Adak", 
#     "namealt": null, 
#     "region": "North America", 
#     "subregion": null 
# }

dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df.withColumn("Node_ID", concat(dataset_prefix, col("properties.name"))) \
                 .withColumn("Type", lit("GeographicRegions")) \
                 .withColumn("Scale Rank", col("properties.scalerank")) 

# PowerPlants

In [None]:
dataset = 'PowerPlants'

In [None]:
# "properties": {
#     "PGM_SYS_AC": "EIA-860",
#     "PGM_SYS_ID": "124",
#     "REGISTRY_I": "110002569569",
#     "PRIMARY_NA": "TUCSON ELECTRIC POWER DEMOSS-PETRIE DSL",
#     "LOCATION_A": "2501 NORTH FLOWING WELLS ROAD",
#     "CITY_NAME": "TUCSON",
#     "COUNTY_NAM": "PIMA",
#     "STATE_CODE": "AZ",
#     "POSTAL_COD": "85705-4015",
#     "FEDERAL_FA": "N",
#     "TRIBAL_LAN": "",
#     "DATA_QUALI": "V",
#     "LAST_REPOR": "",
#     "CREATE_DAT": "2000-03-01",
#     "UPDATE_DAT": "2014-04-30",
#     "LATITUDE83": 32.2523193359375,
#     "LONGITUDE8": -110.99149322509766,
#     "REF_POINT_": "CENTER OF A FACILITY OR STATION",
#     "DERIVED_HU": "15050301",
#     "DERIVED_WB": "150503010906",
#     "DERIVED_CB": "040190045044008",
#     "DERIVED_CD": "02",
#     "OZONE_8HR_": "",
#     "PB_2008_AR": "",
#     "PM25_1997_": "",
#     "PM25_2006_": "",
#     "OZONE_8H_1": "",
#     "UTILITY_ID": "24211",
#     "UTILITY_NA": "Tucson Electric Power Co",
#     "PLANT_CODE": "124",
#     "PLANT_NAME": "Demoss Petrie",
#     "GENERATOR_": "GT2",
#     "PRIME_MOVE": "GT",
#     "STATUS": "OP",
#     "NAMEPLATE": 85,
#     "SUMMER_CAP": 72.19999694824219,
#     "WINTER_CAP": 83.30000305175781,
#     "UNIT_CODE": "",
#     "OPERATING_": "6",
#     "OPERATIN_1": "2001",
#     "ENERGY_SOU": "NG",
#     "ENERGY_S_1": "",
#     "ENERGY_S_2": "",
#     "ENERGY_S_3": "",
#     "ENERGY_S_4": "",
#     "ENERGY_S_5": "",
#     "MULTIPLE_F": "N",
#     "DELIVER_PO": "Y",
#     "SYNCHRONIZ": "",
#     "OWNERSHIP": "S",
#     "TURBINES": ".",
#     "COGENERATO": "N",
#     "SECTOR_NAM": "Electric Utility",
#     "SECTOR": "1",
#     "TOPPING_BO": "",
#     "DUCT_BURNE": "N",
#     "PLANNED_MO": "N",
#     "PLANNED_UP": ".",
#     "PLANNED__1": ".",
#     "PLANNED__2": ".",
#     "PLANNED__3": ".",
#     "PLANNED_DE": ".",
#     "PLANNED__4": ".",
#     "PLANNED__5": ".",
#     "PLANNED__6": ".",
#     "PLANNED_NE": "",
#     "PLANNED_EN": "",
#     "PLANNED_RE": ".",
#     "PLANNED__7": ".",
#     "OTHER_MODS": "",
#     "OTHER_MOD_": ".",
#     "OTHER_MO_1": ".",
#     "PLANNED__8": ".",
#     "PLANNED__9": ".",
#     "SFG_SYSTEM": "N",
#     "PULVERIZED": "",
#     "FLUIDIZED_": "",
#     "SUBCRITICA": "",
#     "SUPERCRITI": "",
#     "ULTRASUPER": "",
#     "CARBONCAPT": "",
#     "STARTUP_SO": "",
#     "STARTUP__1": "",
#     "STARTUP__2": "",
#     "STARTUP__3": "",
#     "ENERGY_SRC": "Natural Gas",
#     "ENERGY_S_6": ""
# }

dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df.withColumn("Node_ID", concat(dataset_prefix, col("properties.PRIMARY_NA"))) \
                 .withColumn("Type", lit("PowerPlants")) \
                 .withColumn("Summer Capacity", col("properties.SUMMER_CAP")) \
                .withColumn("Winter Capacity", col("properties.WINTER_CAP")) 


# NaturalGasPipelines

In [None]:
dataset = 'NaturalGasPipelines'

In [None]:
# "properties": {
#     "TYPEPIPE": "Intrastate",
#     "Operator": "Crosstex Texas Systems",
#     "Shape_Leng": 0.00187974387,
#     "Shape__Len": 240.3441469695
# }

dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df.withColumn("Node_ID", concat(dataset_prefix, col("properties.Operator"))) \
                 .withColumn("Type", lit("NaturalGasPipelines")) \
                 .withColumn("Pipe Type", col("properties.TYPEPIPE")) 


# Dams

In [None]:
dataset = 'Dams'

In [None]:
#  "properties": {
#     "OBJECTID": 9144,
#     "NGAID": "10130292",
#     "METLNKID": "",
#     "FEATTYPE": "POLYLINE",
#     "NAME": "EL PASO DAM NO 10",
#     "CITY": "EL PASO",
#     "STATE": "TX",
#     "COUNTY": "EL PASO",
#     "FIPS": "48141",
#     "DIRECTIONS": "",
#     "EMERGTITLE": "",
#     "EMERGPHONE": "",
#     "EMERGEXT": "",
#     "CONTDATE": "1899-11-30T00:00:00.000Z",
#     "CONTHOW": "",
#     "GEODATE": "2007-03-28T00:00:00.000Z",
#     "GEOHOW": "MANUAL",
#     "HSIPTHEMES": "CRITICAL INFRASTUCTURE, PDD-63; WATER SUPPLY; DAMS",
#     "SOURCE": "USACE",
#     "X": -106.4814352,
#     "Y": 31.7778363,
#     "QC_QA": "",
#     "RECORDID": "72827",
#     "OTHER_NAME": "",
#     "FORM_NAME": "",
#     "STATEID": "",
#     "NIDID": "TX07023",
#     "SECTION": "3106-432",
#     "RIVER": "OFF CH-RIO GRANDE",
#     "CITYAFFECT": "EL PASO",
#     "NIDSTATE": "TX",
#     "NIDCOUNTY": "EL PASO",
#     "DISTANCE": 0,
#     "OWN_TYPE": "L",
#     "PRIV_DAM": "",
#     "DAM_TYPE": "RE",
#     "CORE": "XX",
#     "FOUND": "U",
#     "PURPOSES": "C",
#     "YR_COMPL": "",
#     "YR_MOD": "",
#     "DAM_LENGTH": 0,
#     "DAM_HEIGHT": 30,
#     "STR_HEIGHT": 0,
#     "HYD_HEIGHT": 0,
#     "NID_HEIGHT": 30,
#     "MAX_DIS": 0,
#     "MAX_STOR": 24,
#     "NORMAL_STO": 0,
#     "NID_STOR": 24,
#     "SURF_AREA": 0,
#     "DRAIN_AREA": 0,
#     "HAZARD": "H",
#     "EAP": "N",
#     "INSP_DATE": "1996-07-17T00:00:00.000Z",
#     "INSP_FREQU": 0,
#     "ST_REG_DAM": "Y",
#     "ST_REG_AG": "",
#     "SPILL_TYPE": "U",
#     "SPILL_WIDT": 12,
#     "OUT_GATES": "",
#     "VOLUME": 0,
#     "NO_LOCKS": 0,
#     "LEN_LOCKS": 0,
#     "WID_LOCKS": 0,
#     "FED_FUND": "",
#     "FED_DESIGN": "",
#     "FED_CONSTR": "",
#     "FED_REG": "",
#     "FED_INSP": "",
#     "FED_OPER": "",
#     "FED_OWN": "",
#     "FED_OTHER": "",
#     "SOURCE_A": "TX",
#     "SUB_DATE": "20000401",
#     "URL_ADDRES": "HTTP://WWW.TCEQ.STATE.TX.US/",
#     "CONG_DIST": "TX16",
#     "SHAPE_Leng": 216.77816378547902
# }

dataset_prefix = lit(dataset + "_")

# Attach all properties (Node_ID is the forein id)
df_nodes = df.withColumn("Node_ID", concat(dataset_prefix, col("properties.NAME"))) \
                 .withColumn("Type", lit("Dams")) \
                 .withColumn("River", col("properties.RIVER")) 
