In [1]:
import findspark

findspark.init('/home/ubuntu/spark-2.4.4-bin-hadoop2.7')

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, DoubleType, StructField, StructType
import pyspark.sql.functions as sparkf
from functools import reduce

# below is a notebook that goes through some data provided from: https://www.kaggle.com/jonathanbouchet/biodiversity-in-us-national-parks-2016
# and it's just a way for me to discover how to use some pyspark basics


In [28]:
# define variables I use over and over again
PARK_NAME_KEY = "Park Name"
def create_session(name: str):
    return SparkSession.builder.appName(name).getOrCreate()

def get_null_rows(df):
    # returns a df that includes all the rows with any null values     
    return df.where(reduce(lambda x, y: x | y, [sparkf.col(x).isNull() for x in df.columns]))

In [3]:
spark = create_session("parks")

In [4]:
# explicityly create schemas to ensure the data is what we expect it to be, don't want to rely on spark inference
parks_schema = [StructField("Park Code", StringType(), True), StructField("Park Name", StringType(), True), 
                StructField("State", StringType(), True), StructField("Acres", IntegerType(), True),
               StructField("Latitude", DoubleType(), True), StructField("Longitude", DoubleType(), True)]
park_struct = StructType(fields=parks_schema)

species_schema = [StructField("Species ID", StringType(), True), StructField("Park Name", StringType(), True),
                  StructField("Category", StringType(), True), StructField("Order", StringType(), True),
                  StructField("Family", StringType(), True), StructField("Scientific Name", StringType(), True),
                  StructField("Common Names", StringType(), True), StructField("Record Status", StringType(), True),
                  StructField("Occurence", StringType(), True), StructField("Nativeness", StringType(), True),
                  StructField("Abundance", StringType(), True), StructField("Seasonality", StringType(), True),
                  StructField("Conservation Status", StringType(), True)]
species_struct = StructType(fields=species_schema)

In [5]:
# read in data from csv files
parks_df = spark.read.csv("parks.csv", schema=park_struct, header=True)
species_df = spark.read.csv("species.csv", schema=species_struct, header=True)
dfs_to_analyze = [("parks", parks_df), ("species", species_df)]


In [6]:
# check if any of the data frames have null values
for df_name, df in dfs_to_analyze:
    any_nulls = get_null_rows(df)
    if any_nulls.count() != 0:
        print(f"Need to rectify null values in {df_name} dataframe. There are {any_nulls.count()} rows with null values")

Need to rectify null values in species dataframe. There are 116551 rows with null values


In [7]:
# Some prelim data work in order to get a break down of state acreage and number of parks in each state
class StateInfo:
    
    def __init__(self, state_acreages, parks_per_state):
        self.state_acreages = state_acreages
        self.parks_per_state = parks_per_state

def state_summary(df):
    state_acreages = {}
    parks_per_state = {}

    for row in df.rdd.collect():
        # thought could have attributes be case insensitive - unsure
        states = row.State
        # since I know if multiple states then they are delimited with a ", " (comma and then space)
        states = states.split(", ")
        acreage = row.Acres / len(states)
        for state in states:
            if state in state_acreages:
                state_acreages[state] += acreage
                parks_per_state[state] += 1
            else:
                state_acreages[state] = acreage
                parks_per_state[state] = 1
    
    return StateInfo(state_acreages, parks_per_state)

def format_num_values_dict(data_dict):
    str_output_dict = {}
    for key, value in data_dict.items():
        rounded_num = int(round(data_dict[key], 0))
        str_output_dict[key] = f"{rounded_num:,}"
        
    return str_output_dict
        
    

In [8]:
# by state park summary stats
state_info = state_summary(parks_df)
acreage_output = format_num_values_dict(state_info.state_acreages)
print(f"Total Acreage by state (approx, parks spanning across multiple states \n\
      evenly distributed across those states:\n {acreage_output}")
print(f"Parks per state: {state_info.parks_per_state}")

Total Acreage by state (approx, parks spanning across multiple states 
      evenly distributed across those states:
 {'ME': '47,390', 'UT': '838,454', 'SD': '271,051', 'TX': '887,579', 'FL': '1,746,163', 'CO': '393,884', 'NM': '46,766', 'CA': '5,282,470', 'SC': '26,546', 'OR': '183,224', 'OH': '32,950', 'AK': '31,159,251', 'NV': '2,447,636', 'MT': '1,753,502', 'AZ': '1,402,376', 'TN': '260,745', 'NC': '260,745', 'WY': '1,049,925', 'HI': '352,525', 'AR': '5,550', 'MI': '571,790', 'KY': '52,830', 'WA': '1,663,057', 'VA': '199,045', 'ND': '70,447', 'MN': '218,200', 'ID': '739,930'}
Parks per state: {'ME': 1, 'UT': 5, 'SD': 2, 'TX': 2, 'FL': 3, 'CO': 4, 'NM': 1, 'CA': 8, 'SC': 1, 'OR': 1, 'OH': 1, 'AK': 8, 'NV': 2, 'MT': 2, 'AZ': 3, 'TN': 1, 'NC': 1, 'WY': 2, 'HI': 2, 'AR': 1, 'MI': 1, 'KY': 1, 'WA': 3, 'VA': 1, 'ND': 1, 'MN': 1, 'ID': 1}


In [9]:
# some simple summary data points
agg_row = parks_df.agg(sparkf.sum("Acres").alias("Total Acreage"))
total_acreage = agg_row.head()[0]
print(f"Total national park acreage in US: {total_acreage:,}")
print(f"Number of US National Parks: {parks_df.count()}")


Total national park acreage in US: 51,964,032
Number of US National Parks: 56


In [10]:
# sorted list of park names
print(sorted([row[0] for row in parks_df.distinct().select(PARK_NAME_KEY).collect()]))

['Acadia National Park', 'Arches National Park', 'Badlands National Park', 'Big Bend National Park', 'Biscayne National Park', 'Black Canyon of the Gunnison National Park', 'Bryce Canyon National Park', 'Canyonlands National Park', 'Capitol Reef National Park', 'Carlsbad Caverns National Park', 'Channel Islands National Park', 'Congaree National Park', 'Crater Lake National Park', 'Cuyahoga Valley National Park', 'Death Valley National Park', 'Denali National Park and Preserve', 'Dry Tortugas National Park', 'Everglades National Park', 'Gates Of The Arctic National Park and Preserve', 'Glacier Bay National Park and Preserve', 'Glacier National Park', 'Grand Canyon National Park', 'Grand Teton National Park', 'Great Basin National Park', 'Great Sand Dunes National Park and Preserve', 'Great Smoky Mountains National Park', 'Guadalupe Mountains National Park', 'Haleakala National Park', 'Hawaii Volcanoes National Park', 'Hot Springs National Park', 'Isle Royale National Park', 'Joshua Tre

In [11]:
# joining the two dataframes and only selecting the columns we want 
joined_dataframe = parks_df.join(species_df, on=[PARK_NAME_KEY] , how="inner").select([PARK_NAME_KEY, "Acres", "Category", "Order", "Family"])
park_group_species_count = joined_dataframe.groupBy(PARK_NAME_KEY).count()

# checking if some parks have no species data
if park_group_species_count.count() != parks_df.count():
    parks_species_info = [row[PARK_NAME_KEY] for row in park_group_species_count.select(PARK_NAME_KEY).distinct().collect()]
    national_parks_no_info = [row[PARK_NAME_KEY] for row in parks_df.select(PARK_NAME_KEY).distinct().collect() if row[PARK_NAME_KEY] not in parks_species_info]
    print(f"{len(national_parks_no_info)} number of parks have no species info: {national_parks_no_info}")
else:
    print("All parks have species info")

All parks have species info


In [29]:
# Depending on what park names you input into PARK_VARIABLE it outputs the number of different species within
# the park
PARK_VARIABLE = ["Lassen Volcanic National Park", 'Zion National Park', 'Glacier National Park'] # change depending what park(s) you want to see
grouped_df = joined_dataframe.groupBy(PARK_NAME_KEY).count()
number_of_species = grouped_df.filter(grouped_df[PARK_NAME_KEY].isin(PARK_VARIABLE)).collect()
for result in number_of_species:
    print(f"{result[PARK_NAME_KEY]} has {result['count']} different species")

Zion National Park has 1796 different species
Glacier National Park has 2556 different species
Lassen Volcanic National Park has 1797 different species


In [31]:
# takes PARK_VARIABLE and does some deeper analysis on species within each park
PARK_VARIABLE = ["Lassen Volcanic National Park", 'Zion National Park', 'Glacier National Park'] # change depending what park(s) you want to see
category_pivot_df = joined_dataframe.groupBy(PARK_NAME_KEY).pivot("Category").count()
filtered_category_pivot_df = category_pivot_df.filter(category_pivot_df[PARK_NAME_KEY].isin(PARK_VARIABLE))
# filtered_category_pivot_df.show() #commented out as too wide so will print instead
for row in filtered_category_pivot_df.collect():
    print(f"{row[PARK_NAME_KEY]} category breakdown:") # note need keys to be in single quotes - think b/c my f strings use dbl quotes
    for key, value in row.asDict().items():
          if key != PARK_NAME_KEY:
                print(f"\t {key}: {value}")

Zion National Park category breakdown:
	 Algae: None
	 Amphibian: 7
	 Bird: 301
	 Crab/Lobster/Shrimp: None
	 Fish: 15
	 Fungi: None
	 Insect: None
	 Invertebrate: None
	 Mammal: 80
	 Nonvascular Plant: None
	 Reptile: 30
	 Slug/Snail: None
	 Spider/Scorpion: None
	 Vascular Plant: 1363
Glacier National Park category breakdown:
	 Algae: 2
	 Amphibian: 6
	 Bird: 277
	 Crab/Lobster/Shrimp: 6
	 Fish: 27
	 Fungi: 276
	 Insect: 197
	 Invertebrate: 2
	 Mammal: 69
	 Nonvascular Plant: 404
	 Reptile: 4
	 Slug/Snail: 20
	 Spider/Scorpion: None
	 Vascular Plant: 1266
Lassen Volcanic National Park category breakdown:
	 Algae: 2
	 Amphibian: 17
	 Bird: 245
	 Crab/Lobster/Shrimp: 15
	 Fish: 20
	 Fungi: 51
	 Insect: 95
	 Invertebrate: 18
	 Mammal: 100
	 Nonvascular Plant: 160
	 Reptile: 22
	 Slug/Snail: 6
	 Spider/Scorpion: None
	 Vascular Plant: 1046
