In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [2]:
!hdfs dfs -du 

4722      18888      2023_rainfall
17964355  71857420   ass_1_outs
1920      7680       helloworld
31614556  126458224  outputs
30        120        word-count-spark-notebook


# Schemas


In [3]:
#You may increase your resources
#up to 4 executors, 2 cores per executor, 4 GB of executor memory, and 4 GB of master memory.

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.app.startTime,1714622271925
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.ui.port,4893
spark.driver.memory,4g
spark.executor.memory,4g
spark.sql.warehouse.dir,file:/users/home/nki38/spark-warehouse/
spark.master,spark://masternode2:7077
spark.executor.id,driver
spark.executor.cores,2


In [4]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col

In [5]:
#DAILY
schema = StructType([
    StructField("ID", StringType(), False),
    StructField("DATE", IntegerType(), False),
    StructField("ELEMENT", StringType(), True),
    StructField("VALUE", DoubleType(), True),
    StructField("MEASUREMENT FLAG", StringType(), True),
    StructField("QUALITY FLAG", StringType(), True),
    StructField("SOURCE FLAG", StringType(), True),
    StructField("OBSERVATION TIME", StringType(), True)
])
df_daily = spark.read.csv("hdfs:///data/ghcnd/daily/2020.csv.gz",schema)
df_daily = df_daily.limit(1000)


print(df_daily.count())


1000


In [6]:
#Schemas for states, countries, inventory and stations
def spark_read_fixed_width_format(path, spec):
    return (
        spark.read.text(path)
        .select([
            F.trim(F.substring(F.col('value'), start, end - start + 1)).alias(name).cast(type_)
            for name, start, end, type_ in spec
        ])
        .na.replace('', None)
        .repartition(32)
    )

countries = spark_read_fixed_width_format(
    path="hdfs:///data/ghcnd/ghcnd-countries.txt",
    spec=[
        ('COUNTRY',      1,  2, StringType()),
        ('COUNTRY_NAME', 4, 64, StringType()),
    ]
)


states = spark_read_fixed_width_format(
    path="hdfs:///data/ghcnd/ghcnd-states.txt",
    spec=[
        ('CODE', 1,  2, StringType()),
        ('NAME', 4, 50, StringType()),
    ]
)


inventory = spark_read_fixed_width_format(
    path="hdfs:///data/ghcnd/ghcnd-inventory.txt",
    spec=[
        ('ID',      1,  11, StringType()),
        ('LATITUDE', 13, 20, FloatType()),
        ('LONGITUDE', 22, 30, FloatType()),
        ('ELEMENT', 32, 35, StringType()),
        ('FIRSTYEAR', 37, 40, IntegerType()),
        ('LASTYEAR', 42, 71, IntegerType())
    ]
)




stations = spark_read_fixed_width_format(
    path="hdfs:///data/ghcnd/ghcnd-stations.txt",
    spec=[
        ('ID',      1,  11, StringType()),
        ('LATITUDE', 13, 20, FloatType()),
        ('LONGITUDE', 22, 30, FloatType()),
        ('ELEVATION', 32, 37, FloatType()),
        ('STATE', 39, 40, StringType()),
        ('NAME', 42, 71, StringType()),
        ('GSN_FLAG', 73, 75, StringType()),
        ('HCN_CRN_FLAG', 77, 79, StringType()),
        ('WMO_ID', 81, 85, StringType()), 
    ]
)



print("states")
print(states.count())
show_as_html(states,1)


print("countries")
print(countries.count())
show_as_html(countries,1)

print("inventory")
print(inventory.count())
show_as_html(inventory,1)

print("Stations")
print(stations.count())
show_as_html(stations,1)


without_wmo_id = stations.filter(stations['WMO_ID'].isNull())
print(without_wmo_id.count(), stations.count())

states
74


Unnamed: 0,CODE,NAME
0,WI,WISCONSIN


countries
219


Unnamed: 0,COUNTRY,COUNTRY_NAME
0,RW,Rwanda


inventory
747382


Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEMENT,FIRSTYEAR,LASTYEAR
0,SWE00140810,66.959702,19.82,PRCP,1945,2010


Stations
125983


Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID
0,FIE00142516,60.5,22.766701,52.0,,PAIMIO TAATILA,,,


118023 125983


## Q3


##### Extract the two character country code from each station code in stations and store the output as a new column using the withColumn method.

In [7]:
first_two_chars = F.udf(lambda x: x[:2] if x else None, StringType())
stations_w_country_code = stations.withColumn("COUNTRY", first_two_chars(stations["ID"]))
show_as_html(stations_w_country_code, 1)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY
0,FIE00142516,60.5,22.766701,52.0,,PAIMIO TAATILA,,,,FI


##### LEFT JOIN stations with countries using your output from part (a).

In [8]:
stations_with_country_names = stations_w_country_code.join(countries, on="COUNTRY", how="left")
show_as_html(stations_with_country_names, 1)

Unnamed: 0,COUNTRY,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_NAME
0,US,US1TXCML237,29.680901,-98.120598,201.800003,TX,NEW BRAUNFELS 1.6 S,,,,United States


##### LEFT JOIN stations and states, allowing for the fact that state codes are only provided for stations in the US.

In [9]:
states_alias = states.alias('states')
stations_alias = stations_with_country_names.alias("stations")


joined_data = (
    stations_alias
    .join(
        states_alias,
        (stations_alias["STATE"] == states_alias["CODE"]) & (stations_alias["COUNTRY"] == F.lit("US")),
        how="left"
    )
)

#test_sample = joined_data.filter(F.col("states.CODE").isNotNull())
#test_sample.show()
show_as_html(joined_data)

Unnamed: 0,COUNTRY,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_NAME,CODE,NAME.1
0,US,USC00188320,39.533298,-79.416702,622.099976,MD,SINES DEEP CREEK 2,,,,United States,MD,MARYLAND
1,US,USW00003996,32.348099,-91.029999,25.299999,LA,TALLULAH VICKSBURG RGNL AP,,,,United States,LA,LOUISIANA
2,US,USC00467207,37.3853,-81.082199,725.700012,WV,PRINCETON,,,,United States,WV,WEST VIRGINIA
3,US,USC00444531,36.966702,-78.116699,152.399994,VA,KENBRIDGE,,,,United States,VA,VIRGINIA
4,US,USC00336512,41.150002,-82.099998,262.100006,OH,PENFIELD 1 SE,,,,United States,OH,OHIO
5,US,USC00146100,39.241901,-95.272499,338.0,KS,OSKALOOSA 4 NE,,,,United States,KS,KANSAS
6,US,USC00415538,30.133301,-97.833298,214.0,TX,MANCHACA,,,,United States,TX,TEXAS
7,US,USC00270275,42.8167,-71.199997,51.200001,NH,ARLINGTON MILLS,,,,United States,NH,NEW HAMPSHIRE
8,US,USC00277833,43.354401,-71.736397,167.600006,NH,SALISBURY,,,,United States,NH,NEW HAMPSHIRE
9,US,USC00124910,40.050598,-86.574402,281.899994,IN,LEBANON 6 W,,,,United States,IN,INDIANA



##### Information
''' 
Based on inventory, what was the first and last year that each station was active and collected any element at all?

How many different elements has each station collected overall?

Further, count separately the number of core elements and the number of ”other” elements
that each station has collected overall.

How many stations collect all five core elements? How many collect only precipitation and
no other elements?

Note that we could also determine the set of elements that each station has collected and
store this output as a new column using pyspark.sql.functions.collect set but it will
be more efficient to first filter inventory by element type using the element column and
then to join against that output as necessary'
'''

In [10]:
inventory_by_years = inventory.groupBy("ID").agg(
    F.min("firstyear"),
    F.max("lastyear")
)
show_as_html(inventory_by_years,4)


inventory_elements_collected_per_site = inventory.groupBy("ID").agg(
    F.countDistinct("ELEMENT").alias("num_elements")
)
show_as_html(inventory_elements_collected_per_site,4)


#PRCP = Precipitation (tenths of mm)
#SNOW = Snowfall (mm)
#SNWD = Snow depth (mm)
#TMAX = Maximum temperature (tenths of degrees C)
#TMIN = Minimum temperature (tenths of degrees C)

core_elements = ['PRCP','SNOW','SNWD','TMAX','TMIN']

inventory_elements_collected_core_vs_not_core = inventory.groupBy("ID").agg(
    F.countDistinct("ELEMENT").alias("num_elements"),
    F.sum(F.when(col("ELEMENT").isin(core_elements), 1).otherwise(0)).alias("num_core_elements"),
    F.sum(F.when(F.col("ELEMENT") == 'PRCP', 1).otherwise(0)).alias("num_precip_elements")
)
show_as_html(inventory_elements_collected_core_vs_not_core,4)


only_precipitation = inventory_elements_collected_core_vs_not_core.filter((F.col('num_elements') ==  F.col('num_precip_elements')))




stations_with_all_core_elements = inventory_elements_collected_core_vs_not_core.filter(
    F.col("num_core_elements") == len(core_elements)
)


print("How many stations collect all five core elements?")
print(stations_with_all_core_elements.count())

print("How many collect only precipitation and no other elements")
print(only_precipitation.count())


Unnamed: 0,ID,min(firstyear),max(lastyear)
0,US1LAEB0005,2008,2018
1,US1MSOK0013,2009,2009
2,US1CTTL0035,2020,2024
3,US1NHGR0055,2020,2023


Unnamed: 0,ID,num_elements
0,US1MOND0003,2
1,US1MDAA0066,4
2,US1NHMR0024,2
3,US1MNIS0006,5


Unnamed: 0,ID,num_elements,num_core_elements,num_precip_elements
0,ASN00044021,11,3,1
1,CA004020560,8,5,1
2,ASN00088130,4,1,1
3,USC00033242,19,5,1


How many stations collect all five core elements?
20467
How many collect only precipitation and no other elements
16301


SyntaxError: invalid syntax (<ipython-input-11-d946f8809dcd>, line 1)

#####  LEFT JOIN stations and your output from part (d)

In [12]:
elements_and_years = inventory_by_years.join(stations_with_all_core_elements, on="ID", how="left")
show_as_html(elements_and_years, 2)

Unnamed: 0,ID,min(firstyear),max(lastyear),num_elements,num_core_elements,num_precip_elements
0,AGE00147719,1888,2024,,,
1,ALE00100939,1940,2000,,,


In [13]:

new_stations = stations_w_country_code.join(elements_and_years, on="ID", how="left")
# new_stations = new_stations.drop(stations_with_all_core_elements["ID"])
show_as_html(new_stations, 4)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY,min(firstyear),max(lastyear),num_elements,num_core_elements,num_precip_elements
0,AGE00147719,33.799702,2.89,767.0,,LAGHOUAT,,,60545.0,AG,1888,2024,,,
1,ALE00100939,41.333099,19.7831,89.0,,TIRANA,,,,AL,1940,2000,,,
2,AQC00914873,-14.35,-170.766693,14.9,AS,TAPUTIMU TUTUILA,,,,AQ,1955,1967,12.0,5.0,1.0
3,AR000000002,-29.82,-57.419998,75.0,,BONPLAND,,,,AR,1981,2000,,,


##### LEFT JOIN your 1000 rows subset of daily and your output from part (e). Are there any stations in your subset of daily that are not in stations at all?

In [21]:
joined_df = df_daily.join(new_stations, on="ID", how="left")
joined_df.show(1)
df_daily.show(2)
missing_stations = joined_df.filter(F.col('LATITUDE').isna())
missing_sations.show()


+-----------+--------+-------+-----+----------------+------------+-----------+----------------+--------+---------+---------+-----+--------+--------+------------+------+-------+--------------+-------------+------------+-----------------+-------------------+
|         ID|    DATE|ELEMENT|VALUE|MEASUREMENT FLAG|QUALITY FLAG|SOURCE FLAG|OBSERVATION TIME|LATITUDE|LONGITUDE|ELEVATION|STATE|    NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY|min(firstyear)|max(lastyear)|num_elements|num_core_elements|num_precip_elements|
+-----------+--------+-------+-----+----------------+------------+-----------+----------------+--------+---------+---------+-----+--------+--------+------------+------+-------+--------------+-------------+------------+-----------------+-------------------+
|AGE00147719|20200101|   TMAX|142.0|            null|        null|          S|            null| 33.7997|     2.89|    767.0| null|LAGHOUAT|    null|        null| 60545|     AG|          1888|         2024|        null|           

TypeError: 'Column' object is not callable

In [25]:
missing_stations = joined_df.filter(F.col('LATITUDE').isNull())
missing_stations.show()
print(new_stations.count())
print(joined_df.count())
test = new_stations.filter(F.col('ID') == 'AGM00060640')
test.show()

+-----------+--------+-------+-----+----------------+------------+-----------+----------------+--------+---------+---------+-----+----+--------+------------+------+-------+--------------+-------------+------------+-----------------+-------------------+
|         ID|    DATE|ELEMENT|VALUE|MEASUREMENT FLAG|QUALITY FLAG|SOURCE FLAG|OBSERVATION TIME|LATITUDE|LONGITUDE|ELEVATION|STATE|NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY|min(firstyear)|max(lastyear)|num_elements|num_core_elements|num_precip_elements|
+-----------+--------+-------+-----+----------------+------------+-----------+----------------+--------+---------+---------+-----+----+--------+------------+------+-------+--------------+-------------+------------+-----------------+-------------------+
|AGM00060640|20200101|   TMIN|  6.0|            null|        null|          S|            null|    null|     null|     null| null|null|    null|        null|  null|   null|          null|         null|        null|             null|         

In [26]:
print(joined_df.count())
print(new_stations.rdd.getNumPartitions())

1000
32


In [None]:

filename = "stations_augmented.csv"
output_path = f"hdfs:///user/nki38/outputs/ghcnd/{filename}"
new_stations.repartition(1).write.csv(output_path, header=True, mode="overwrite")

In [None]:
stop_spark()