**Spark notebook**

This notebook will only work in a Jupyter session running on `mathmadslinux2p`.

You can start your own Jupyter session on `mathmadslinux2p` and open this notebook in Chrome on the MADS Windows server by

1. Login to the MADS Windows server using https://mathportal.canterbury.ac.nz/.
2. Download or copy this notebook to your home directory.
3. Open powershell and run `ssh mathmadslinux2p`.
4. Run `start_pyspark_notebook` or `/opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999))))`.
5. Copy / paste the url provided in the shell window into Chrome on the MADS Windows server.
6. Open the notebook from the Jupyter root directory (which is your home directory).
7. Run `start_spark()` to start a spark session in the notebook.
8. Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the Spark UI.

In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas as pd
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))


# Print function docstrings

help(start_spark)
help(stop_spark)
help(display_spark)
help(show_as_html)

Help on function start_spark in module __main__:

start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1)
    Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)

Help on function stop_spark in module __main__:

stop_spark()
    Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).

Help on function display_spark in module __main__:

display_spark()
    Display the status of the active Spark session if one is currently running.

Help on function show_as_html in module __main__:

show_as_html(df, n=20)
    Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n 

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1)
#start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.driver.extraJavaOptions,-Dderby.system.home=/tmp/abh89/spark/
spark.app.name,abh89 (jupyter)
spark.master,spark://masternode2:7077
spark.driver.port,43679
spark.app.startTime,1663076150207
spark.executor.id,driver
spark.sql.warehouse.dir,file:/users/home/abh89/spark/spark-warehouse
spark.driver.memory,1g
spark.driver.host,mathmadslinux2p.canterbury.ac.nz


In [3]:
# Write your imports and code here or insert cells below

from pyspark.sql import Row, DataFrame, Window, functions as F
from pyspark.sql.types import *

In [None]:
#Processing Q1

In [4]:
! hdfs dfs -ls -h /data/ghcnd/daily

Found 260 items
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:08 /data/ghcnd/daily/1763.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.2 K 2021-08-09 15:03 /data/ghcnd/daily/1764.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:03 /data/ghcnd/daily/1765.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 14:56 /data/ghcnd/daily/1766.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:06 /data/ghcnd/daily/1767.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.2 K 2021-08-09 15:02 /data/ghcnd/daily/1768.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:03 /data/ghcnd/daily/1769.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:07 /data/ghcnd/daily/1770.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:06 /data/ghcnd/daily/1771.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:05 /data/ghcnd/daily/1772.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:08 /data/ghcnd/daily/1773.c

In [5]:
! hdfs dfs -du -h /data/ghcnd/

15.8 G  126.1 G  /data/ghcnd/daily
3.6 K   28.6 K   /data/ghcnd/ghcnd-countries.txt
31.8 M  254.7 M  /data/ghcnd/ghcnd-inventory.txt
1.1 K   8.5 K    /data/ghcnd/ghcnd-states.txt
10.0 M  80.1 M   /data/ghcnd/ghcnd-stations.txt


In [4]:
#Processing Q2

In [7]:
#Defining schema for daily
schema_daily = StructType([
    StructField("ID", StringType(), True),  #Station code
    StructField("DATE", DateType(), True),  #Observation date formatted as YYYYMMDD
    StructField("ELEMENT", StringType(), True),  #Element type indicator
    StructField("VALUE", DoubleType(), True),  #Data value for ELEMENT
    StructField("MEASUREMENT_FLAG", StringType(), True),  # Measurement Flag
    StructField("QUALITY_FLAG", StringType(), True),  #Quality Flag
    StructField("SOURCE_FLAG", StringType(), True),  #Source Flag
    StructField("OBSERVATION_TIME", TimestampType(), True),  #Observation time formatted as HHMM
])

In [10]:
#Loading 1000 rows of the hdfs:///data/ghcnd/daily/2022.csv.gz file into Spark
daily = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/2022.csv.gz")
    .limit(1000)
)
daily.show(20)


+-----------+----+-------+-----+----------------+------------+-----------+----------------+
|         ID|DATE|ELEMENT|VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+----+-------+-----+----------------+------------+-----------+----------------+
|AE000041196|null|   TAVG|204.0|               H|        null|          S|            null|
|AEM00041194|null|   TAVG|211.0|               H|        null|          S|            null|
|AEM00041217|null|   TAVG|209.0|               H|        null|          S|            null|
|AEM00041218|null|   TAVG|207.0|               H|        null|          S|            null|
|AG000060390|null|   TAVG|121.0|               H|        null|          S|            null|
|AG000060590|null|   TAVG|151.0|               H|        null|          S|            null|
|AG000060611|null|   TAVG|111.0|               H|        null|          S|            null|
|AGE00147708|null|   TMIN| 73.0|            null|        null|          S|      

In [11]:
#Changing the datatype for DATE and OBSERVATION_TYPE to StringType() in the schema for daily
schema_daily = StructType([
    StructField("ID", StringType(), True),  #Station code
    StructField("DATE", StringType(), True),  #Observation date formatted as YYYYMMDD
    StructField("ELEMENT", StringType(), True),  #Element type indicator
    StructField("VALUE", DoubleType(), True),  #Data value for ELEMENT
    StructField("MEASUREMENT_FLAG", StringType(), True),  # Measurement Flag
    StructField("QUALITY_FLAG", StringType(), True),  #Quality Flag
    StructField("SOURCE_FLAG", StringType(), True),  #Source Flag
    StructField("OBSERVATION_TIME", StringType(), True),  #Observation time formatted as HHMM
])

In [12]:
#Again loading 1000 rows of the hdfs:///data/ghcnd/daily/2022.csv.gz file into Spark
daily = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/2022.csv.gz")
    .limit(1000)
)
daily.show(20)

+-----------+--------+-------+-----+----------------+------------+-----------+----------------+
|         ID|    DATE|ELEMENT|VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+--------+-------+-----+----------------+------------+-----------+----------------+
|AE000041196|20220101|   TAVG|204.0|               H|        null|          S|            null|
|AEM00041194|20220101|   TAVG|211.0|               H|        null|          S|            null|
|AEM00041217|20220101|   TAVG|209.0|               H|        null|          S|            null|
|AEM00041218|20220101|   TAVG|207.0|               H|        null|          S|            null|
|AG000060390|20220101|   TAVG|121.0|               H|        null|          S|            null|
|AG000060590|20220101|   TAVG|151.0|               H|        null|          S|            null|
|AG000060611|20220101|   TAVG|111.0|               H|        null|          S|            null|
|AGE00147708|20220101|   TMIN| 73.0|    

In [13]:
daily.printSchema()

root
 |-- ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- VALUE: double (nullable = true)
 |-- MEASUREMENT_FLAG: string (nullable = true)
 |-- QUALITY_FLAG: string (nullable = true)
 |-- SOURCE_FLAG: string (nullable = true)
 |-- OBSERVATION_TIME: string (nullable = true)



In [31]:
#Load Stations 
load_stations = spark.read.text("hdfs:///data/ghcnd/ghcnd-stations.txt")
load_stations.show(10)

+--------------------+
|               value|
+--------------------+
|ACW00011604  17.1...|
|ACW00011647  17.1...|
|AE000041196  25.3...|
|AEM00041194  25.2...|
|AEM00041217  24.4...|
|AEM00041218  24.2...|
|AF000040930  35.3...|
|AFM00040938  34.2...|
|AFM00040948  34.5...|
|AFM00040990  31.5...|
+--------------------+
only showing top 10 rows



In [32]:
#Parse the fixed width text formatting
stations = load_stations.select(
    load_stations.value.substr(1, 11).alias('ID'),
    load_stations.value.substr(13, 8).cast(DoubleType()).alias('LATITUDE'),
    load_stations.value.substr(22, 9).cast(DoubleType()).alias('LONGITUDE'),
    load_stations.value.substr(32, 6).cast(DoubleType()).alias('ELEVATION'),
    load_stations.value.substr(39, 2).alias('STATE'),
    load_stations.value.substr(42, 30).alias('NAME'),
    load_stations.value.substr(73, 3).alias('GSN_FLAG'),
    load_stations.value.substr(77, 3).alias('HCN_CRN_FLAG'), 
    load_stations.value.substr(81, 5).alias('WMO_ID'),
)
stations.show(10)

+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|ACW00011604| 17.1167| -61.7833|     10.1|     |ST JOHNS COOLIDGE...|        |            |      |
|ACW00011647| 17.1333| -61.7833|     19.2|     |ST JOHNS         ...|        |            |      |
|AE000041196|  25.333|   55.517|     34.0|     |SHARJAH INTER. AI...|     GSN|            | 41196|
|AEM00041194|  25.255|   55.364|     10.4|     |DUBAI INTL       ...|        |            | 41194|
|AEM00041217|  24.433|   54.651|     26.8|     |ABU DHABI INTL   ...|        |            | 41217|
|AEM00041218|  24.262|   55.609|    264.9|     |AL AIN INTL      ...|        |            | 41218|
|AF000040930|  35.317|   69.017|   3366.0|     |NORTH-SALANG     ...|     GSN|            | 40930|
|AFM000409

In [16]:
stations.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- STATE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- GSN_FLAG: string (nullable = true)
 |-- HCN_CRN_FLAG: string (nullable = true)
 |-- WMO_ID: string (nullable = true)



In [33]:
stations.count()

122047

In [28]:
def clean_metadata(dataframe):
    """Performes some cleaning operations to make the text data 
    more uniform"""
    for col in dataframe.columns:
        if dataframe.schema[col].dataType == StringType():
            #Remove extra whitespace
            dataframe = dataframe.withColumn(col, F.trim(F.col(col)))
            #Convert empty strings to null columns
            dataframe = dataframe.withColumn(col, F.when(F.col(col)=="",None).otherwise(F.col(col)))
    return dataframe

In [37]:
#Clean stations
stations = clean_metadata(stations)

In [38]:
#stations that do not have a WMO ID
stations.select('ID').where(F.col('WMO_ID').isNull()).count()

113961

In [19]:
#Load States
load_states = spark.read.text("hdfs:///data/ghcnd/ghcnd-states.txt")
load_states.show(10)

+--------------------+
|               value|
+--------------------+
|          AB ALBERTA|
|           AK ALASKA|
|AL ALABAMA       ...|
|         AR ARKANSAS|
|   AS AMERICAN SAMOA|
|          AZ ARIZONA|
| BC BRITISH COLUMBIA|
|       CA CALIFORNIA|
|         CO COLORADO|
|      CT CONNECTICUT|
+--------------------+
only showing top 10 rows



In [20]:
#Parse the fixed width text formatting
states = load_states.select(
    load_states.value.substr(1, 2).alias('CODE'),
    load_states.value.substr(4, 47).alias('NAME'),
)
states.show(10)

+----+--------------------+
|CODE|                NAME|
+----+--------------------+
|  AB|             ALBERTA|
|  AK|              ALASKA|
|  AL|ALABAMA          ...|
|  AR|            ARKANSAS|
|  AS|      AMERICAN SAMOA|
|  AZ|             ARIZONA|
|  BC|    BRITISH COLUMBIA|
|  CA|          CALIFORNIA|
|  CO|            COLORADO|
|  CT|         CONNECTICUT|
+----+--------------------+
only showing top 10 rows



In [40]:
states.printSchema()

root
 |-- CODE: string (nullable = true)
 |-- NAME: string (nullable = true)



In [41]:
states.count()

74

In [39]:
#Clean states
states = clean_metadata(states)

In [22]:
# Load Countries
load_countries = spark.read.text("hdfs:///data/ghcnd/ghcnd-countries.txt")
load_countries.show(10)

+--------------------+
|               value|
+--------------------+
|AC Antigua and Ba...|
|AE United Arab Em...|
|      AF Afghanistan|
|         AG Algeria |
|      AJ Azerbaijan |
|          AL Albania|
|         AM Armenia |
|          AO Angola |
|AQ American Samoa...|
|       AR Argentina |
+--------------------+
only showing top 10 rows



In [23]:
#Parse the fixed width text formatting
countries = load_countries.select(
    load_countries.value.substr(1, 2).alias('CODE'),
    load_countries.value.substr(4, 61).alias('NAME'),
)
countries.show(10)

+----+--------------------+
|CODE|                NAME|
+----+--------------------+
|  AC|Antigua and Barbuda |
|  AE|United Arab Emira...|
|  AF|         Afghanistan|
|  AG|            Algeria |
|  AJ|         Azerbaijan |
|  AL|             Albania|
|  AM|            Armenia |
|  AO|             Angola |
|  AQ|American Samoa [U...|
|  AR|          Argentina |
+----+--------------------+
only showing top 10 rows



In [42]:
#Clean countries
countries = clean_metadata(countries)

In [43]:
countries.printSchema()

root
 |-- CODE: string (nullable = true)
 |-- NAME: string (nullable = true)



In [44]:
countries.count()

219

In [25]:
#Load Inventory
load_inventory = spark.read.text("hdfs:///data/ghcnd/ghcnd-inventory.txt")
load_inventory.show(10)

+--------------------+
|               value|
+--------------------+
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
|ACW00011604  17.1...|
+--------------------+
only showing top 10 rows



In [26]:
#Parse the fixed width text formatting
inventory = load_inventory.select(
    load_inventory.value.substr(1, 11).alias('ID'),
    load_inventory.value.substr(13, 8).cast(DoubleType()).alias('LATITUDE'),
    load_inventory.value.substr(22, 9).cast(DoubleType()).alias('LONGITUDE'),
    load_inventory.value.substr(32, 4).alias('ELEMENT'),
    load_inventory.value.substr(37, 4).cast(IntegerType()).alias('FIRSTYEAR'),
    load_inventory.value.substr(42, 4).cast(IntegerType()).alias('LASTYEAR'),
)
inventory.show(10)

+-----------+--------+---------+-------+---------+--------+
|         ID|LATITUDE|LONGITUDE|ELEMENT|FIRSTYEAR|LASTYEAR|
+-----------+--------+---------+-------+---------+--------+
|ACW00011604| 17.1167| -61.7833|   TMAX|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   TMIN|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   PRCP|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   SNOW|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   SNWD|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   PGTM|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WDFG|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WSFG|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WT03|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WT08|     1949|    1949|
+-----------+--------+---------+-------+---------+--------+
only showing top 10 rows



In [46]:
inventory.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- FIRSTYEAR: integer (nullable = true)
 |-- LASTYEAR: integer (nullable = true)



In [47]:
inventory.count()

725754

In [45]:
#Clean inventory
inventory = clean_metadata(inventory)

In [31]:
# Processing Q3

In [48]:
#Rename column names for clarity
states_renamed = states.select(
  F.col("CODE").alias("STATE_CODE"),
  F.col("NAME").alias("STATE_NAME")
)
states_renamed.show(10)

+----------+----------------+
|STATE_CODE|      STATE_NAME|
+----------+----------------+
|        AB|         ALBERTA|
|        AK|          ALASKA|
|        AL|         ALABAMA|
|        AR|        ARKANSAS|
|        AS|  AMERICAN SAMOA|
|        AZ|         ARIZONA|
|        BC|BRITISH COLUMBIA|
|        CA|      CALIFORNIA|
|        CO|        COLORADO|
|        CT|     CONNECTICUT|
+----------+----------------+
only showing top 10 rows



In [49]:
#Rename column names for clarity
countries_renamed = countries.select(
  F.col("CODE").alias("COUNTRY_CODE"),
  F.col("NAME").alias("COUNTRY_NAME")
)
countries_renamed.show(10)

+------------+--------------------+
|COUNTRY_CODE|        COUNTRY_NAME|
+------------+--------------------+
|          AC| Antigua and Barbuda|
|          AE|United Arab Emirates|
|          AF|         Afghanistan|
|          AG|             Algeria|
|          AJ|          Azerbaijan|
|          AL|             Albania|
|          AM|             Armenia|
|          AO|              Angola|
|          AQ|American Samoa [U...|
|          AR|           Argentina|
+------------+--------------------+
only showing top 10 rows



In [50]:
#Rename column names for clarity
stations_renamed = stations.select(
  F.col("ID"),
  F.col("LATITUDE"), 
  F.col("LONGITUDE"),
  F.col("ELEVATION"),
  F.col("STATE").alias("STATE_CODE"),
  F.col("NAME").alias("STATION_NAME"),
  F.col("GSN_FLAG"),
  F.col("HCN_CRN_FLAG"),
  F.col("WMO_ID")
)
stations_renamed.show(10)

+-----------+--------+---------+---------+----------+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE_CODE|        STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+----------+--------------------+--------+------------+------+
|ACW00011604| 17.1167| -61.7833|     10.1|      null|ST JOHNS COOLIDGE...|    null|        null|  null|
|ACW00011647| 17.1333| -61.7833|     19.2|      null|            ST JOHNS|    null|        null|  null|
|AE000041196|  25.333|   55.517|     34.0|      null| SHARJAH INTER. AIRP|     GSN|        null| 41196|
|AEM00041194|  25.255|   55.364|     10.4|      null|          DUBAI INTL|    null|        null| 41194|
|AEM00041217|  24.433|   54.651|     26.8|      null|      ABU DHABI INTL|    null|        null| 41217|
|AEM00041218|  24.262|   55.609|    264.9|      null|         AL AIN INTL|    null|        null| 41218|
|AF000040930|  35.317|   69.017|   3366.0|      null|        NOR

In [51]:
#Renaming for consistency in table name
inventory_renamed = inventory.select(
  F.col("ID"),
  F.col("LATITUDE"), 
  F.col("LONGITUDE"),
  F.col("ELEMENT"),
  F.col("FIRSTYEAR"),
  F.col("LASTYEAR")
)
inventory_renamed.show(10)

+-----------+--------+---------+-------+---------+--------+
|         ID|LATITUDE|LONGITUDE|ELEMENT|FIRSTYEAR|LASTYEAR|
+-----------+--------+---------+-------+---------+--------+
|ACW00011604| 17.1167| -61.7833|   TMAX|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   TMIN|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   PRCP|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   SNOW|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   SNWD|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   PGTM|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WDFG|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WSFG|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WT03|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WT08|     1949|    1949|
+-----------+--------+---------+-------+---------+--------+
only showing top 10 rows



In [52]:
#Extract country code from ID
stations_renamed = stations_renamed.withColumn("COUNTRY_CODE", F.col("ID")[0:2])
stations_renamed.show(10)

+-----------+--------+---------+---------+----------+--------------------+--------+------------+------+------------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE_CODE|        STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_CODE|
+-----------+--------+---------+---------+----------+--------------------+--------+------------+------+------------+
|ACW00011604| 17.1167| -61.7833|     10.1|      null|ST JOHNS COOLIDGE...|    null|        null|  null|          AC|
|ACW00011647| 17.1333| -61.7833|     19.2|      null|            ST JOHNS|    null|        null|  null|          AC|
|AE000041196|  25.333|   55.517|     34.0|      null| SHARJAH INTER. AIRP|     GSN|        null| 41196|          AE|
|AEM00041194|  25.255|   55.364|     10.4|      null|          DUBAI INTL|    null|        null| 41194|          AE|
|AEM00041217|  24.433|   54.651|     26.8|      null|      ABU DHABI INTL|    null|        null| 41217|          AE|
|AEM00041218|  24.262|   55.609|    264.9|      null|         AL

In [53]:
#Left join returns all rows from the left dataset regardless of match found on the right dataset when join expression 
#doesn’t match, it assigns null for that record and drops records from right where match not found.
stations_with_country_name = (
  stations_renamed
  .join(
    countries_renamed,
    on="COUNTRY_CODE",
    how="left"
  )
)   
stations_with_country_name.show(10) 

+------------+-----------+--------+---------+---------+----------+--------------------+--------+------------+------+--------------------+
|COUNTRY_CODE|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE_CODE|        STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|        COUNTRY_NAME|
+------------+-----------+--------+---------+---------+----------+--------------------+--------+------------+------+--------------------+
|          AC|ACW00011604| 17.1167| -61.7833|     10.1|      null|ST JOHNS COOLIDGE...|    null|        null|  null| Antigua and Barbuda|
|          AC|ACW00011647| 17.1333| -61.7833|     19.2|      null|            ST JOHNS|    null|        null|  null| Antigua and Barbuda|
|          AE|AE000041196|  25.333|   55.517|     34.0|      null| SHARJAH INTER. AIRP|     GSN|        null| 41196|United Arab Emirates|
|          AE|AEM00041194|  25.255|   55.364|     10.4|      null|          DUBAI INTL|    null|        null| 41194|United Arab Emirates|
|          AE|AEM00041217|  24.433

In [54]:
#Left join stations and countries with states.
stations_with_country_and_state_name = (
  stations_with_country_name
  .join(
    states_renamed,
    on="STATE_CODE",
    how="left"
  )
)   
stations_with_country_and_state_name.show(10)

+----------+------------+-----------+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+
|STATE_CODE|COUNTRY_CODE|         ID|LATITUDE|LONGITUDE|ELEVATION|        STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|        COUNTRY_NAME|STATE_NAME|
+----------+------------+-----------+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+
|      null|          AC|ACW00011604| 17.1167| -61.7833|     10.1|ST JOHNS COOLIDGE...|    null|        null|  null| Antigua and Barbuda|      null|
|      null|          AC|ACW00011647| 17.1333| -61.7833|     19.2|            ST JOHNS|    null|        null|  null| Antigua and Barbuda|      null|
|      null|          AE|AE000041196|  25.333|   55.517|     34.0| SHARJAH INTER. AIRP|     GSN|        null| 41196|United Arab Emirates|      null|
|      null|          AE|AEM00041194|  25.255|   55.364|     10.4|          DUBAI INTL|    null|        nu

In [56]:
stations_with_country_and_state_name.select('STATE_CODE','STATE_NAME','COUNTRY_CODE','COUNTRY_NAME').where((F.col('COUNTRY_CODE')!='US') & (F.col('STATE_NAME')!='null')).distinct().show(50)

+----------+--------------------+------------+--------------------+
|STATE_CODE|          STATE_NAME|COUNTRY_CODE|        COUNTRY_NAME|
+----------+--------------------+------------+--------------------+
|        BC|    BRITISH COLUMBIA|          CA|              Canada|
|        MH|    MARSHALL ISLANDS|          RM|    Marshall Islands|
|        VI|      VIRGIN ISLANDS|          VQ|Virgin Islands [U...|
|        NU|             NUNAVUT|          CA|              Canada|
|        AB|             ALBERTA|          CA|              Canada|
|        QC|              QUEBEC|          CA|              Canada|
|        UM|U.S. MINOR OUTLYI...|          LQ|Palmyra Atoll [Un...|
|        NT|NORTHWEST TERRITO...|          CA|              Canada|
|        PE|PRINCE EDWARD ISLAND|          CA|              Canada|
|        NL|NEWFOUNDLAND AND ...|          CA|              Canada|
|        UM|U.S. MINOR OUTLYI...|          JQ|Johnston Atoll [U...|
|        UM|U.S. MINOR OUTLYI...|          WQ|Wa

In [58]:
inventory_renamed.show(10)

+-----------+--------+---------+-------+---------+--------+
|         ID|LATITUDE|LONGITUDE|ELEMENT|FIRSTYEAR|LASTYEAR|
+-----------+--------+---------+-------+---------+--------+
|ACW00011604| 17.1167| -61.7833|   TMAX|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   TMIN|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   PRCP|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   SNOW|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   SNWD|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   PGTM|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WDFG|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WSFG|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WT03|     1949|    1949|
|ACW00011604| 17.1167| -61.7833|   WT08|     1949|    1949|
+-----------+--------+---------+-------+---------+--------+
only showing top 10 rows



In [42]:
inventory_renamed.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- FIRSTYEAR: integer (nullable = true)
 |-- LASTYEAR: integer (nullable = true)



In [3]:
core_elements = ["PRCP", "SNOW", "SNWD", "TMAX", "TMIN"]  # define the core elements as a list

In [60]:
#Temporary table to count core and other elements using array functions
temp = (
    inventory_renamed
    .groupby(["ID"])
    .agg(
        F.collect_list(F.col("ELEMENT")).alias("ELEMENTS"),
        F.collect_set(F.col("ELEMENT")).alias("ELEMENTS_DISTINCT"),
        F.min(F.col("FIRSTYEAR")).alias("START_YEAR"),  
        F.max(F.col("LASTYEAR")).alias("END_YEAR"),
    )
    .select( 
        F.col("ID"),
        F.col("START_YEAR"),
        F.col("END_YEAR"),
        F.col("ELEMENTS"),
        F.col("ELEMENTS_DISTINCT"),
        F.size(F.col("ELEMENTS_DISTINCT")).alias("NUM_ELEMENTS"),
        F.array_intersect(F.col("ELEMENTS_DISTINCT"), F.array([F.lit(x) for x in core_elements])).alias("CORE_ELEMENTS"),
        F.size(F.array_intersect(F.col("ELEMENTS_DISTINCT"), F.array([F.lit(x) for x in core_elements]))).alias("NUM_CORE_ELEMENTS"),
        F.array_except(F.col("ELEMENTS_DISTINCT"), F.array([F.lit(x) for x in core_elements])).alias("OTHER_ELEMENTS"),
        F.size(F.array_except(F.col("ELEMENTS_DISTINCT"), F.array([F.lit(x) for x in core_elements]))).alias("NUM_OTHER_ELEMENTS"),

    )
    .sort(F.col("ID"))
)
temp.show(10)

+-----------+----------+--------+--------------------+--------------------+------------+--------------------+-----------------+--------------------+------------------+
|         ID|START_YEAR|END_YEAR|            ELEMENTS|   ELEMENTS_DISTINCT|NUM_ELEMENTS|       CORE_ELEMENTS|NUM_CORE_ELEMENTS|      OTHER_ELEMENTS|NUM_OTHER_ELEMENTS|
+-----------+----------+--------+--------------------+--------------------+------------+--------------------+-----------------+--------------------+------------------+
|ACW00011604|      1949|    1949|[TMAX, TMIN, PRCP...|[TMAX, TMIN, WSFG...|          11|[TMAX, TMIN, PRCP...|                5|[WSFG, WDFG, PGTM...|                 6|
|ACW00011647|      1957|    1970|[TMAX, TMIN, PRCP...|[TMAX, TMIN, PRCP...|           7|[TMAX, TMIN, PRCP...|                5|        [WT16, WT03]|                 2|
|AE000041196|      1944|    2022|[TMAX, TMIN, PRCP...|[TMAX, TMIN, PRCP...|           4|  [TMAX, TMIN, PRCP]|                3|              [TAVG]|            

In [66]:
#Copy temp to hdfs
temp.write.parquet("/user/abh89/spark/outputs/ghcnd/inventory.parquet")

In [67]:
#Copy from hdfs to local
!hdfs dfs -copyToLocal /user/abh89/spark/outputs/ghcnd/inventory.parquet ~/spark/outputs/ghcnd/inventory.parquet

In [70]:
temp.count()

122010

In [46]:
temp.printSchema()

root
 |-- ID: string (nullable = true)
 |-- START YEAR: integer (nullable = true)
 |-- END YEAR: integer (nullable = true)
 |-- ELEMENTS: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- ELEMENTS DISTINCT: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- NUM ELEMENTS: integer (nullable = false)
 |-- CORE ELEMENTS: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- NUM CORE ELEMENTS: integer (nullable = false)
 |-- OTHER ELEMENTS: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- NUM OTHER ELEMENTS: integer (nullable = false)



In [72]:
#Sum of different elements collected by each station
temp.agg(F.sum("NUM_ELEMENTS")).show()

+-----------------+
|sum(NUM_ELEMENTS)|
+-----------------+
|           725754|
+-----------------+



In [75]:
#Sum of core elements collected by each station
temp.agg(F.sum("NUM_CORE_ELEMENTS")).show()

+----------------------+
|sum(NUM_CORE_ELEMENTS)|
+----------------------+
|                336814|
+----------------------+



In [76]:
#Sum of other elements collected by each station
temp.agg(F.sum("NUM_OTHER_ELEMENTS")).show()

+-----------------------+
|sum(NUM_OTHER_ELEMENTS)|
+-----------------------+
|                 388940|
+-----------------------+



In [71]:
#Num of stations collecting all 5 core elements
temp.select('ID').where(F.col('NUM_CORE_ELEMENTS') == 5).count()

20300

In [78]:
#collecting only precipitation and no other element.
temp.where((F.array_contains(F.col('ELEMENTS'),'PRCP'))&(F.col('NUM_ELEMENTS') == 1)).count()

16159

In [79]:
#Left join stations_with_country_and_state_name with inventory
#This is enriched stations data
stations_with_country_state_name_inventory = (
  stations_with_country_and_state_name
  .join(
    temp,
    on="ID",
    how="left"
  )
    .na.fill(0)
)   
stations_with_country_state_name_inventory.show(10)

+-----------+----------+------------+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+----------+--------+--------------------+--------------------+------------+--------------------+-----------------+--------------+------------------+
|         ID|STATE_CODE|COUNTRY_CODE|LATITUDE|LONGITUDE|ELEVATION|        STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|        COUNTRY_NAME|STATE_NAME|START_YEAR|END_YEAR|            ELEMENTS|   ELEMENTS_DISTINCT|NUM_ELEMENTS|       CORE_ELEMENTS|NUM_CORE_ELEMENTS|OTHER_ELEMENTS|NUM_OTHER_ELEMENTS|
+-----------+----------+------------+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+----------+--------+--------------------+--------------------+------------+--------------------+-----------------+--------------+------------------+
|AEM00041217|      null|          AE|  24.433|   54.651|     26.8|      ABU DHABI INTL|    null|        null|

In [41]:
#Copy enriched stations data to hdfs
stations_with_country_state_name_inventory.write.parquet("/user/abh89/spark/outputs/ghcnd/stations_enriched.parquet")

In [44]:
!hdfs dfs -ls /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet 

Found 33 items
-rw-r--r--   4 abh89 abh89          0 2022-09-11 04:07 /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet/_SUCCESS
-rw-r--r--   4 abh89 abh89     203029 2022-09-11 04:07 /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet/part-00000-40e4a51f-89d9-4d4c-8889-caed61a262dd-c000.snappy.parquet
-rw-r--r--   4 abh89 abh89     204560 2022-09-11 04:07 /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet/part-00001-40e4a51f-89d9-4d4c-8889-caed61a262dd-c000.snappy.parquet
-rw-r--r--   4 abh89 abh89     204672 2022-09-11 04:07 /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet/part-00002-40e4a51f-89d9-4d4c-8889-caed61a262dd-c000.snappy.parquet
-rw-r--r--   4 abh89 abh89     209437 2022-09-11 04:07 /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet/part-00003-40e4a51f-89d9-4d4c-8889-caed61a262dd-c000.snappy.parquet
-rw-r--r--   4 abh89 abh89     202387 2022-09-11 04:07 /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet/part-00004-40e4a51f-

In [45]:
#Copy from hdfs to local
!hdfs dfs -copyToLocal /user/abh89/spark/outputs/ghcnd/stations_enriched.parquet ~/spark/outputs/ghcnd/stations_enriched.parquet

In [80]:
daily.show(10)

+-----------+--------+-------+-----+----------------+------------+-----------+----------------+
|         ID|    DATE|ELEMENT|VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+--------+-------+-----+----------------+------------+-----------+----------------+
|AE000041196|20220101|   TAVG|204.0|               H|        null|          S|            null|
|AEM00041194|20220101|   TAVG|211.0|               H|        null|          S|            null|
|AEM00041217|20220101|   TAVG|209.0|               H|        null|          S|            null|
|AEM00041218|20220101|   TAVG|207.0|               H|        null|          S|            null|
|AG000060390|20220101|   TAVG|121.0|               H|        null|          S|            null|
|AG000060590|20220101|   TAVG|151.0|               H|        null|          S|            null|
|AG000060611|20220101|   TAVG|111.0|               H|        null|          S|            null|
|AGE00147708|20220101|   TMIN| 73.0|    

In [81]:
#Left join subset of daily and stations_with_country_state_name_inventory(enriched stations)
daily_with_stations = (
  daily
  .join(
    stations_with_country_state_name_inventory,
    on="ID",
    how="left"
  )
)   
daily_with_stations.show(10)

+-----------+--------+-------+-----+----------------+------------+-----------+----------------+----------+------------+--------+---------+---------+--------------+--------+------------+------+--------------------+----------+----------+--------+--------------------+--------------------+------------+--------------------+-----------------+--------------+------------------+
|         ID|    DATE|ELEMENT|VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|STATE_CODE|COUNTRY_CODE|LATITUDE|LONGITUDE|ELEVATION|  STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|        COUNTRY_NAME|STATE_NAME|START_YEAR|END_YEAR|            ELEMENTS|   ELEMENTS_DISTINCT|NUM_ELEMENTS|       CORE_ELEMENTS|NUM_CORE_ELEMENTS|OTHER_ELEMENTS|NUM_OTHER_ELEMENTS|
+-----------+--------+-------+-----+----------------+------------+-----------+----------------+----------+------------+--------+---------+---------+--------------+--------+------------+------+--------------------+----------+----------+--------+----------

In [85]:
daily_with_stations.select('ID').where(F.col('END_YEAR').isNull()).show(10)

+---+
| ID|
+---+
+---+



In [86]:
#Analysis Q1

In [87]:
#Total number of stations
stations_with_country_state_name_inventory.count()

122047

In [91]:
#Stations active in 2021
stations_with_country_state_name_inventory.select('ID').where((F.col('END_YEAR')> 2020)&(F.col('START_YEAR')!=2022)).count()

42588

In [100]:
#Stations in GSN
stations_with_country_state_name_inventory.select('ID').where(F.col('GSN_FLAG')=='GSN').count()

991

In [101]:
#Stations in HCN
stations_with_country_state_name_inventory.select('ID').where(F.col('HCN_CRN_FLAG')=='HCN').count()

1218

In [103]:
#Stations in CRN
stations_with_country_state_name_inventory.select('ID').where(F.col('HCN_CRN_FLAG')=='CRN').count()

0

In [104]:
#Stations in GSN and HCN
stations_with_country_state_name_inventory.select('ID').where((F.col('HCN_CRN_FLAG')=='HCN')&(F.col('GSN_FLAG')=='GSN')).count()

14

In [105]:
#Create a temporary table temp2 for aggregation
temp2 = (
  stations_with_country_state_name_inventory
  .groupBy(["COUNTRY_CODE"])
  .agg(
    F.countDistinct(F.col("ID")).cast(IntegerType()).alias("NUM"),
  )
    .sort(F.col("COUNTRY_CODE"))
)    
    
temp2.show(20)

+------------+-----+
|COUNTRY_CODE|  NUM|
+------------+-----+
|          AC|    2|
|          AE|    4|
|          AF|    4|
|          AG|   87|
|          AJ|   66|
|          AL|    3|
|          AM|   53|
|          AO|    6|
|          AQ|   21|
|          AR|  101|
|          AS|17088|
|          AU|   13|
|          AY|  102|
|          BA|    1|
|          BB|    1|
|          BC|   21|
|          BD|    2|
|          BE|    1|
|          BF|   51|
|          BG|   10|
+------------+-----+
only showing top 20 rows



In [106]:
#Join temporary table and countries
#Update column name using withColumnRenamed
countries_updated = (
    countries_renamed
    .join(temp2.withColumnRenamed("NUM","NUM_STATIONS"), on="COUNTRY_CODE", how="left")
    .na.fill(0)
    .sort(F.col("COUNTRY_CODE"))
)

countries_updated.show(10)

+------------+--------------------+------------+
|COUNTRY_CODE|        COUNTRY_NAME|NUM_STATIONS|
+------------+--------------------+------------+
|          AC| Antigua and Barbuda|           2|
|          AE|United Arab Emirates|           4|
|          AF|         Afghanistan|           4|
|          AG|             Algeria|          87|
|          AJ|          Azerbaijan|          66|
|          AL|             Albania|           3|
|          AM|             Armenia|          53|
|          AO|              Angola|           6|
|          AQ|American Samoa [U...|          21|
|          AR|           Argentina|         101|
+------------+--------------------+------------+
only showing top 10 rows



In [107]:
#Save the data to hdfs
countries_updated.repartition(1).write.csv("/user/abh89/spark/outputs/ghcnd/countries.csv")

In [108]:
#Copy to local
!hdfs dfs -copyToLocal /user/abh89/spark/outputs/ghcnd/countries.csv ~/spark/outputs/ghcnd/countries.csv

In [109]:
#Create a temporary table temp3 for aggregation
temp3 = (
  stations_with_country_state_name_inventory
  .groupBy(["STATE_CODE"])
  .agg(
    F.countDistinct(F.col("ID")).cast(IntegerType()).alias("NUM"),
  )
    .sort(F.col("STATE_CODE"))
)    
    
temp3.show(20)

+----------+-----+
|STATE_CODE|  NUM|
+----------+-----+
|      null|43969|
|        AB| 1440|
|        AK| 1018|
|        AL| 1059|
|        AR|  901|
|        AS|   21|
|        AZ| 1604|
|        BC| 1709|
|        BH|   45|
|        CA| 2977|
|        CO| 4457|
|        CT|  397|
|        DC|   17|
|        DE|  134|
|        FL| 2021|
|        FM|   38|
|        GA| 1329|
|        GU|   21|
|        HI|  762|
|        IA|  955|
+----------+-----+
only showing top 20 rows



In [110]:
#Join temporary table and states
#Update column name using withColumnRenamed
states_updated = (
    states_renamed
    .join(temp3.withColumnRenamed("NUM","NUM_STATIONS"), on="STATE_CODE", how="left")
    .na.fill(0)
    .sort(F.col("STATE_CODE"))
)

states_updated.show(10)

+----------+----------------+------------+
|STATE_CODE|      STATE_NAME|NUM_STATIONS|
+----------+----------------+------------+
|        AB|         ALBERTA|        1440|
|        AK|          ALASKA|        1018|
|        AL|         ALABAMA|        1059|
|        AR|        ARKANSAS|         901|
|        AS|  AMERICAN SAMOA|          21|
|        AZ|         ARIZONA|        1604|
|        BC|BRITISH COLUMBIA|        1709|
|        CA|      CALIFORNIA|        2977|
|        CO|        COLORADO|        4457|
|        CT|     CONNECTICUT|         397|
+----------+----------------+------------+
only showing top 10 rows



In [111]:
#Save the data to hdfs
states_updated.repartition(1).write.csv("/user/abh89/spark/outputs/ghcnd/states.csv")

In [112]:
#Copy to local
!hdfs dfs -copyToLocal /user/abh89/spark/outputs/ghcnd/states.csv ~/spark/outputs/ghcnd/states.csv

In [113]:
#Stations in southern hemisphere
stations_with_country_state_name_inventory.select('ID').where(F.col('LATITUDE')<0).count()

25337

In [133]:
#Finding US territories
US_territories = (stations_with_country_and_state_name.select('STATE_CODE','STATE_NAME','COUNTRY_CODE','COUNTRY_NAME')
                  .where((F.col('COUNTRY_CODE')!='US') & (F.col('STATE_NAME')!='null')& (F.col('COUNTRY_NAME').contains('United States')))
                  .distinct())
US_territories.show(10)

+----------+--------------------+------------+--------------------+
|STATE_CODE|          STATE_NAME|COUNTRY_CODE|        COUNTRY_NAME|
+----------+--------------------+------------+--------------------+
|        VI|      VIRGIN ISLANDS|          VQ|Virgin Islands [U...|
|        UM|U.S. MINOR OUTLYI...|          LQ|Palmyra Atoll [Un...|
|        UM|U.S. MINOR OUTLYI...|          JQ|Johnston Atoll [U...|
|        UM|U.S. MINOR OUTLYI...|          WQ|Wake Island [Unit...|
|        AS|      AMERICAN SAMOA|          AQ|American Samoa [U...|
|        GU|                GUAM|          GQ|Guam [United States]|
|        UM|U.S. MINOR OUTLYI...|          MQ|Midway Islands [U...|
|        MP|NORTHERN MARIANA ...|          CQ|Northern Mariana ...|
|        PR|         PUERTO RICO|          RQ|Puerto Rico [Unit...|
+----------+--------------------+------------+--------------------+



In [134]:
#Count of stations in US territories
#Leftjoin US territories and countries as there are duplicate state code for different country code
US_territories = (
    US_territories
    .join(countries_updated, on="COUNTRY_CODE", how="left")
    .na.fill(0)
    .select('STATE_CODE','COUNTRY_CODE','NUM_STATIONS')
)

US_territories.show(10)

+----------+------------+------------+
|STATE_CODE|COUNTRY_CODE|NUM_STATIONS|
+----------+------------+------------+
|        MP|          CQ|          11|
|        UM|          WQ|           1|
|        AS|          AQ|          21|
|        UM|          LQ|           3|
|        GU|          GQ|          21|
|        UM|          MQ|           2|
|        UM|          JQ|           4|
|        VI|          VQ|          64|
|        PR|          RQ|         227|
+----------+------------+------------+



In [135]:
#Sum of stations in US territories
US_territories.agg(F.sum("NUM_STATIONS")).show()

+-----------------+
|sum(NUM_STATIONS)|
+-----------------+
|              354|
+-----------------+



In [None]:
#Analysis Q2

In [67]:
stations_with_country_state_name_inventory.show(10)

+-----------+----------+------------+--------+---------+---------+--------------------+--------+-----------+------+--------------------+--------------+----------+--------+--------------------+--------------------+------------+--------------------+-----------------+--------------------+------------------+
|         ID|STATE CODE|COUNTRY CODE|LATITUDE|LONGITUDE|ELEVATION|        STATION NAME|GSN FLAG|HCNSRN FLAG|WMO ID|        COUNTRY NAME|    STATE NAME|START YEAR|END YEAR|            ELEMENTS|   ELEMENTS DISTINCT|NUM ELEMENTS|       CORE ELEMENTS|NUM CORE ELEMENTS|      OTHER ELEMENTS|NUM OTHER ELEMENTS|
+-----------+----------+------------+--------+---------+---------+--------------------+--------+-----------+------+--------------------+--------------+----------+--------+--------------------+--------------------+------------+--------------------+-----------------+--------------------+------------------+
|AGE00147719|          |          AG| 33.7997|     2.89|    767.0|LAGHOUAT        

In [136]:
from math import radians, cos, sin, asin, sqrt

In [138]:
#Filter stations in New Zealand and select required columns
nz_stations = (stations_with_country_state_name_inventory
.filter(F.col("COUNTRY_CODE")=="NZ")
.select("ID","STATION_NAME","LATITUDE","LONGITUDE"))
nz_stations.show(5)

+-----------+-------------------+--------+---------+
|         ID|       STATION_NAME|LATITUDE|LONGITUDE|
+-----------+-------------------+--------+---------+
|NZM00093110|  AUCKLAND AERO AWS|   -37.0|    174.8|
|NZ000936150| HOKITIKA AERODROME| -42.717|  170.983|
|NZM00093678|           KAIKOURA| -42.417|    173.7|
|NZ000093844|INVERCARGILL AIRPOR| -46.417|  168.333|
|NZ000093994| RAOUL ISL/KERMADEC|  -29.25| -177.917|
+-----------+-------------------+--------+---------+
only showing top 5 rows



In [139]:
#Haversine function
def get_distance(longit_a, latit_a, longit_b, latit_b):
# Transform to radians
    longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
    dist_longit = longit_b - longit_a
    dist_latit = latit_b - latit_a
# Calculate area
    area = sin(dist_latit/2)**2 + cos(latit_a) * cos(latit_b) * sin(dist_longit/2)**2
# Calculate the central angle
    central_angle = 2 * asin(sqrt(area))
    radius = 6371
# Calculate Distance
    distance = central_angle * radius
    return abs(round(distance, 2))

In [140]:
#Create a udf to use it on our spark dataframe
udf_get_distance = F.udf(get_distance)

In [141]:
#Cross join
nz_station_pairs = (nz_stations.crossJoin(nz_stations).toDF(
"STATION_ID_A", "STATION_NAME_A", "LATITUDE_A", "LONGITUDE_A",
"STATION_ID_B", "STATION_NAME_B", "LATITUDE_B", "LONGITUDE_B"))

In [142]:
nz_station_pairs.show(5)

+------------+-----------------+----------+-----------+------------+-------------------+----------+-----------+
|STATION_ID_A|   STATION_NAME_A|LATITUDE_A|LONGITUDE_A|STATION_ID_B|     STATION_NAME_B|LATITUDE_B|LONGITUDE_B|
+------------+-----------------+----------+-----------+------------+-------------------+----------+-----------+
| NZM00093110|AUCKLAND AERO AWS|     -37.0|      174.8| NZM00093110|  AUCKLAND AERO AWS|     -37.0|      174.8|
| NZM00093110|AUCKLAND AERO AWS|     -37.0|      174.8| NZ000936150| HOKITIKA AERODROME|   -42.717|    170.983|
| NZM00093110|AUCKLAND AERO AWS|     -37.0|      174.8| NZM00093678|           KAIKOURA|   -42.417|      173.7|
| NZM00093110|AUCKLAND AERO AWS|     -37.0|      174.8| NZ000093844|INVERCARGILL AIRPOR|   -46.417|    168.333|
| NZM00093110|AUCKLAND AERO AWS|     -37.0|      174.8| NZ000093994| RAOUL ISL/KERMADEC|    -29.25|   -177.917|
+------------+-----------------+----------+-----------+------------+-------------------+----------+-----

In [143]:
#Remove repeated rows
nz_station_pairs =(nz_station_pairs.filter(
nz_station_pairs.STATION_ID_A != nz_station_pairs.STATION_ID_B))

In [144]:
#Apply udf to nz_station_pairs to add a new column ABS_DISTANCE.
nz_station_distance = (nz_station_pairs.withColumn("ABS_DISTANCE", udf_get_distance(
nz_station_pairs.LONGITUDE_A, nz_station_pairs.LATITUDE_A,
nz_station_pairs.LONGITUDE_B, nz_station_pairs.LATITUDE_B)
))

In [146]:
#cast the ABS_DISTANCE column as a double
cols_to_cast = ["LONGITUDE_A" ,"LATITUDE_A" ,"LONGITUDE_B" ,"LATITUDE_B","ABS_DISTANCE"]
nz_station_distance = nz_station_distance.select([F.col(c).cast('double') if c in cols_to_cast else c for c in nz_station_distance.columns])


In [147]:
#Absolute value of havershine distances
nz_station_distance.sort(F.col("ABS_DISTANCE")).show(10)

+------------+-------------------+----------+-----------+------------+-------------------+----------+-----------+------------+
|STATION_ID_A|     STATION_NAME_A|LATITUDE_A|LONGITUDE_A|STATION_ID_B|     STATION_NAME_B|LATITUDE_B|LONGITUDE_B|ABS_DISTANCE|
+------------+-------------------+----------+-----------+------------+-------------------+----------+-----------+------------+
| NZ000093417|    PARAPARAUMU AWS|     -40.9|    174.983| NZM00093439|WELLINGTON AERO AWS|   -41.333|      174.8|       50.53|
| NZM00093439|WELLINGTON AERO AWS|   -41.333|      174.8| NZ000093417|    PARAPARAUMU AWS|     -40.9|    174.983|       50.53|
| NZM00093678|           KAIKOURA|   -42.417|      173.7| NZM00093439|WELLINGTON AERO AWS|   -41.333|      174.8|      151.07|
| NZM00093439|WELLINGTON AERO AWS|   -41.333|      174.8| NZM00093678|           KAIKOURA|   -42.417|      173.7|      151.07|
| NZM00093781|  CHRISTCHURCH INTL|   -43.489|    172.532| NZ000936150| HOKITIKA AERODROME|   -42.717|    170.98

In [150]:
nz_station_distance.agg(F.min("ABS_DISTANCE")).show()

+-----------------+
|min(ABS_DISTANCE)|
+-----------------+
|            50.53|
+-----------------+



In [78]:
nz_station_distance.printSchema()

root
 |-- STATION_ID_A: string (nullable = true)
 |-- STATION_NAME_A: string (nullable = true)
 |-- LATITUDE_A: double (nullable = false)
 |-- LONGITUDE_A: double (nullable = false)
 |-- STATION_ID_B: string (nullable = true)
 |-- STATION_NAME_B: string (nullable = true)
 |-- LATITUDE_B: double (nullable = false)
 |-- LONGITUDE_B: double (nullable = false)
 |-- ABS_DISTANCE: double (nullable = true)



In [151]:
#Save the data to hdfs
nz_station_distance.write.csv("/user/abh89/spark/outputs/ghcnd/newzealand_stations_distance.csv")

In [152]:
#Copy to local
!hdfs dfs -copyToLocal /user/abh89/spark/outputs/ghcnd/newzealand_stations_distance.csv ~/spark/outputs/ghcnd/newzealand_stations_distance.csv

In [153]:
#Analysis Q3

In [184]:
!hdfs getconf -confKey "dfs.blocksize"

134217728


In [82]:
!hdfs dfs -du -h -v /data/ghcnd/daily/202*.csv.gz

SIZE     DISK_SPACE_CONSUMED_WITH_ALL_REPLICAS  FULL_PATH_NAME
154.5 M  1.2 G                                  /data/ghcnd/daily/2020.csv.gz
152.2 M  1.2 G                                  /data/ghcnd/daily/2021.csv.gz
84.1 M   672.9 M                                /data/ghcnd/daily/2022.csv.gz


In [83]:
#Number of blocks
!hdfs fsck /data/ghcnd/daily/2021.csv.gz -files -blocks

Connecting to namenode via http://masternode2:9870/fsck?ugi=abh89&files=1&blocks=1&path=%2Fdata%2Fghcnd%2Fdaily%2F2021.csv.gz
FSCK started by abh89 (auth:SIMPLE) from /192.168.40.11 for path /data/ghcnd/daily/2021.csv.gz at Sat Sep 10 02:25:55 NZST 2022

/data/ghcnd/daily/2021.csv.gz 159598394 bytes, replicated: replication=8, 2 block(s):  OK
0. BP-700027894-132.181.129.68-1626517177804:blk_1073787806_46986 len=134217728 Live_repl=8
1. BP-700027894-132.181.129.68-1626517177804:blk_1073787807_46987 len=25380666 Live_repl=8


Status: HEALTHY
 Number of data-nodes:	32
 Number of racks:		1
 Total dirs:			0
 Total symlinks:		0

Replicated Blocks:
 Total size:	159598394 B
 Total files:	1
 Total blocks (validated):	2 (avg. block size 79799197 B)
 Minimally replicated blocks:	2 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	4
 Average block replication:	8.0
 Missing blocks:

In [84]:
!hdfs fsck /data/ghcnd/daily/2022.csv.gz -files -blocks

Connecting to namenode via http://masternode2:9870/fsck?ugi=abh89&files=1&blocks=1&path=%2Fdata%2Fghcnd%2Fdaily%2F2022.csv.gz
FSCK started by abh89 (auth:SIMPLE) from /192.168.40.11 for path /data/ghcnd/daily/2022.csv.gz at Sat Sep 10 02:27:36 NZST 2022

/data/ghcnd/daily/2022.csv.gz 88195367 bytes, replicated: replication=8, 1 block(s):  OK
0. BP-700027894-132.181.129.68-1626517177804:blk_1073787808_46988 len=88195367 Live_repl=8


Status: HEALTHY
 Number of data-nodes:	32
 Number of racks:		1
 Total dirs:			0
 Total symlinks:		0

Replicated Blocks:
 Total size:	88195367 B
 Total files:	1
 Total blocks (validated):	1 (avg. block size 88195367 B)
 Minimally replicated blocks:	1 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	4
 Average block replication:	8.0
 Missing blocks:		0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Blocks queued for replication:	0

Erasure Coded Block Groups:
 

In [85]:
# Check the path is right and the data can be loaded successfully

schema = StructType([
    StructField("ID", StringType(), True),                # Character Station code
    StructField("DATE", StringType(), True),              # Date Observation date formatted as YYYYMMDD
    StructField("ELEMENT", StringType(), True),           # Character Element type indicator
    StructField("VALUE", StringType(), True),             # Real Data value for ELEMENT
    StructField("MEASUREMENT", StringType(), True),  # Character Measurement Flag
    StructField("QUALITY", StringType(), True),      # Character Quality Flag
    StructField("SOURCE", StringType(), True),       # Character Source Flag
    StructField("TIME", StringType(), True),  # Time Observation time formatted as HHMM
])

In [86]:
#Observation count and number of partitions
daily_2021 = spark.read.csv("/data/ghcnd/daily/2021.csv.gz", schema=schema)
daily_2022 = spark.read.csv("/data/ghcnd/daily/2022.csv.gz", schema=schema)

print(f'2021 count = {daily_2021.count()}')
print(f'2022 count = {daily_2022.count()}')
print("")

print(f"2021 num partitions = {daily_2021.rdd.getNumPartitions()}")
print(f"2022 num partitions = {daily_2022.rdd.getNumPartitions()}")

2021 count = 35917254
2022 count = 19648456

2021 num partitions = 1
2022 num partitions = 1


In [89]:
years = [2014,2015,2016,2017,2018,2019,2020,2021,2022]

daily_selected = spark.read.csv([f"/data/ghcnd/daily/{year}.csv.gz" for year in years], schema=schema)
#daily_selected.cache()

print(f"daily_selected count          = {daily_selected.count()}")
print(f"daily_selected num partitions = {daily_selected.rdd.getNumPartitions()}")

daily_selected count          = 303501016
daily_selected num partitions = 9


In [90]:
daily_2022 = spark.read.csv("/data/ghcnd/daily/2022.csv.gz", schema=schema)
daily_2021 = spark.read.csv("/data/ghcnd/daily/2021.csv.gz", schema=schema)
daily_2020 = spark.read.csv("/data/ghcnd/daily/2020.csv.gz", schema=schema)
daily_2019 = spark.read.csv("/data/ghcnd/daily/2019.csv.gz", schema=schema)
daily_2018 = spark.read.csv("/data/ghcnd/daily/2018.csv.gz", schema=schema)
daily_2017 = spark.read.csv("/data/ghcnd/daily/2017.csv.gz", schema=schema)
daily_2016 = spark.read.csv("/data/ghcnd/daily/2016.csv.gz", schema=schema)
daily_2015 = spark.read.csv("/data/ghcnd/daily/2015.csv.gz", schema=schema)
daily_2014 = spark.read.csv("/data/ghcnd/daily/2014.csv.gz", schema=schema)


print(f'2022 count = {daily_2022.count()}')
print(f'2021 count = {daily_2021.count()}')
print(f'2020 count = {daily_2020.count()}')
print(f'2019 count = {daily_2019.count()}')
print(f'2018 count = {daily_2018.count()}')
print(f'2017 count = {daily_2017.count()}')
print(f'2016 count = {daily_2016.count()}')
print(f'2015 count = {daily_2015.count()}')
print(f'2014 count = {daily_2014.count()}')
print("")

print(f"2022 num partitions = {daily_2022.rdd.getNumPartitions()}")
print(f"2021 num partitions = {daily_2021.rdd.getNumPartitions()}")
print(f"2020 num partitions = {daily_2020.rdd.getNumPartitions()}")
print(f"2019 num partitions = {daily_2019.rdd.getNumPartitions()}")
print(f"2018 num partitions = {daily_2018.rdd.getNumPartitions()}")
print(f"2017 num partitions = {daily_2017.rdd.getNumPartitions()}")
print(f"2016 num partitions = {daily_2016.rdd.getNumPartitions()}")
print(f"2015 num partitions = {daily_2015.rdd.getNumPartitions()}")
print(f"2014 num partitions = {daily_2014.rdd.getNumPartitions()}")


2022 count = 19648456
2021 count = 35917254
2020 count = 36167120
2019 count = 35941498
2018 count = 36326971
2017 count = 34854073
2016 count = 35326496
2015 count = 34899014
2014 count = 34420134

2022 num partitions = 1
2021 num partitions = 1
2020 num partitions = 1
2019 num partitions = 1
2018 num partitions = 1
2017 num partitions = 1
2016 num partitions = 1
2015 num partitions = 1
2014 num partitions = 1


In [91]:
year_counts = (
    daily_selected
    .groupby([
        F.substring(F.col('DATE'), 0, 4).alias('YEAR')
    ])
    .agg(
        F.count(F.col('VALUE')).alias('COUNT'),
    )
)
#year_counts.cache()
year_counts.show()

+----+--------+
|YEAR|   COUNT|
+----+--------+
|2018|36326971|
|2016|35326496|
|2015|34899014|
|2021|35917254|
|2019|35941498|
|2017|34854073|
|2020|36167120|
|2014|34420134|
|2022|19648456|
+----+--------+



In [92]:
year_counts.rdd.getNumPartitions()

32

In [94]:
def get_partition_size(index, iterable_of_rows):
    return [
        (index, len(list(iterable_of_rows)))  # each partition is transformed into one row of (index, count)
    ]

schema = StructType([
  StructField("INDEX", StringType(), True),
  StructField("COUNT", StringType(), True),
])
partition_row_counts = spark.createDataFrame(
    year_counts.rdd.mapPartitionsWithIndex(get_partition_size),  # RDD[(partition index, partition row count)]
    schema=schema
)
partition_row_counts.show(32)

+-----+-----+
|INDEX|COUNT|
+-----+-----+
|    0|    0|
|    1|    0|
|    2|    1|
|    3|    0|
|    4|    0|
|    5|    0|
|    6|    0|
|    7|    0|
|    8|    2|
|    9|    2|
|   10|    2|
|   11|    0|
|   12|    1|
|   13|    0|
|   14|    0|
|   15|    0|
|   16|    0|
|   17|    0|
|   18|    0|
|   19|    0|
|   20|    0|
|   21|    0|
|   22|    0|
|   23|    0|
|   24|    0|
|   25|    0|
|   26|    1|
|   27|    0|
|   28|    0|
|   29|    0|
|   30|    0|
|   31|    0|
+-----+-----+



In [1]:
#Analysis Q4

In [77]:
#Defining schema for daily
schema_daily_all = StructType([
    StructField("ID", StringType(), True),    #Station code
    StructField("DATE", StringType(), True),    #Observation date formatted as YYYYMMDD
    StructField("ELEMENT", StringType(), True),   #Element type indicator
    StructField("VALUE", DoubleType(), True),     #Data value for ELEMENT
    StructField("MEASUREMENT_FLAG", StringType(), True),    #Measurement Flag
    StructField("QUALITY_FLAG", StringType(), True),    #Quality Flag
    StructField("SOURCE_FLAG", StringType(), True),    #Source Flag
    StructField("OBSERVATION_TIME", StringType(), True),    ##Observation time formatted as HHMM
])

In [78]:
#Loading daily into Spark
daily_all = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily_all)
    .load("hdfs:///data/ghcnd/daily/*")
)
daily_all.show(10)

+-----------+--------+-------+------+----------------+------------+-----------+----------------+
|         ID|    DATE|ELEMENT| VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+--------+-------+------+----------------+------------+-----------+----------------+
|CA002303986|20100101|   TMAX| 205.0|            null|           G|          C|            null|
|CA002303986|20100101|   TMIN|-300.0|            null|        null|          C|            null|
|CA002303986|20100101|   PRCP|   4.0|            null|        null|          C|            null|
|CA002303986|20100101|   SNOW|   4.0|            null|        null|          C|            null|
|CA002303986|20100101|   SNWD|   0.0|            null|           I|          C|            null|
|US1FLSL0019|20100101|   PRCP|   0.0|               T|        null|          N|            null|
|ASN00037003|20100101|   PRCP|   0.0|            null|        null|          a|            null|
|US1AZMR0019|20100101|   PRCP|

In [79]:
daily_all.count()

3018826504

In [4]:
#Core elements
core_elements

['PRCP', 'SNOW', 'SNWD', 'TMAX', 'TMIN']

In [80]:
#Filter observations containing five core elements
import functools
import operator
daily_core = (daily_all
.filter(functools.reduce(operator.or_, [F.col("ELEMENT") == x for x in core_elements]))
.select("ID","DATE","ELEMENT","VALUE","MEASUREMENT_FLAG","QUALITY_FLAG","SOURCE_FLAG","OBSERVATION_TIME"))
daily_core.show(20)

+-----------+--------+-------+------+----------------+------------+-----------+----------------+
|         ID|    DATE|ELEMENT| VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+--------+-------+------+----------------+------------+-----------+----------------+
|CA002303986|20100101|   TMAX| 205.0|            null|           G|          C|            null|
|CA002303986|20100101|   TMIN|-300.0|            null|        null|          C|            null|
|CA002303986|20100101|   PRCP|   4.0|            null|        null|          C|            null|
|CA002303986|20100101|   SNOW|   4.0|            null|        null|          C|            null|
|CA002303986|20100101|   SNWD|   0.0|            null|           I|          C|            null|
|US1FLSL0019|20100101|   PRCP|   0.0|               T|        null|          N|            null|
|ASN00037003|20100101|   PRCP|   0.0|            null|        null|          a|            null|
|US1AZMR0019|20100101|   PRCP|

In [232]:
daily_core.select("ID").where(F.col('ELEMENT')=='PRCP').count()

1048156273

In [81]:
#core element count
core_counts = (
    daily_core
    .groupby([
        F.col('ELEMENT').alias('CORE_ELEMENT')
    ])
    .agg(
        F.count(F.col("ID")).cast(IntegerType()).alias('COUNT'),
    )
)
#year_counts.cache()
core_counts.show()

+------------+----------+
|CORE_ELEMENT|     COUNT|
+------------+----------+
|        SNOW| 344268930|
|        SNWD| 290998195|
|        TMIN| 445687425|
|        PRCP|1048156273|
|        TMAX| 447084093|
+------------+----------+



In [228]:
daily_core.printSchema()

root
 |-- ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- VALUE: double (nullable = true)
 |-- MEASUREMENT FLAG: string (nullable = true)
 |-- QUALITY FLAG: string (nullable = true)
 |-- SOURCE FLAG: string (nullable = true)
 |-- OBSERVATION TIME: string (nullable = true)



In [98]:
#Filter TMIN TMAX from daily
daily_minmax = (daily_core
.filter((F.col("ELEMENT")=="TMAX")|(F.col("ELEMENT")=="TMIN"))                 
.select("ID","DATE","ELEMENT","VALUE","MEASUREMENT_FLAG","QUALITY_FLAG","SOURCE_FLAG","OBSERVATION_TIME")
)
daily_minmax.show(20)

+-----------+--------+-------+------+----------------+------------+-----------+----------------+
|         ID|    DATE|ELEMENT| VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+--------+-------+------+----------------+------------+-----------+----------------+
|CA002303986|20100101|   TMAX| 205.0|            null|           G|          C|            null|
|CA002303986|20100101|   TMIN|-300.0|            null|        null|          C|            null|
|USC00178998|20100101|   TMAX|   0.0|            null|        null|          0|            1800|
|USC00178998|20100101|   TMIN| -56.0|            null|        null|          0|            1800|
|NOE00133566|20100101|   TMAX|   2.0|            null|        null|          E|            null|
|NOE00133566|20100101|   TMIN| -84.0|            null|        null|          E|            null|
|USC00242347|20100101|   TMAX|  33.0|            null|        null|          0|            0800|
|USC00242347|20100101|   TMIN|

In [99]:
#Create a temporary table to count TMIN and TMAX against each ID and Date
temp = (
    daily_minmax
    .groupby(["ID","DATE"])
    .agg(
        F.collect_set(F.col("ELEMENT")).alias("ELEMENTS_DISTINCT")
    )
    .select( 
        F.col("ID"),
        F.col("DATE"),
        F.col("ELEMENTS_DISTINCT"),
        F.size(F.col("ELEMENTS_DISTINCT")).alias("NUM_ELEMENTS"),
        F.size(F.array_intersect(F.col("ELEMENTS_DISTINCT"), F.array([F.lit('TMAX')]))).alias("NUM_TMAX"),
        F.size(F.array_intersect(F.col("ELEMENTS_DISTINCT"), F.array([F.lit('TMIN')]))).alias("NUM_TMIN"),
    )
)
temp.show(20)

+-----------+--------+-----------------+------------+--------+--------+
|         ID|    DATE|ELEMENTS_DISTINCT|NUM_ELEMENTS|NUM_TMAX|NUM_TMIN|
+-----------+--------+-----------------+------------+--------+--------+
|ACW00011604|19490314|     [TMAX, TMIN]|           2|       1|       1|
|ACW00011604|19490316|     [TMAX, TMIN]|           2|       1|       1|
|ACW00011604|19490401|     [TMAX, TMIN]|           2|       1|       1|
|ACW00011604|19490409|     [TMAX, TMIN]|           2|       1|       1|
|ACW00011604|19490506|     [TMAX, TMIN]|           2|       1|       1|
|ACW00011604|19490510|     [TMAX, TMIN]|           2|       1|       1|
|ACW00011604|19490520|     [TMAX, TMIN]|           2|       1|       1|
|ACW00011604|19490722|     [TMAX, TMIN]|           2|       1|       1|
|AE000041196|19440325|     [TMAX, TMIN]|           2|       1|       1|
|AE000041196|19440328|     [TMAX, TMIN]|           2|       1|       1|
|AE000041196|19440530|     [TMAX, TMIN]|           2|       1|  

In [100]:
#TMIN don't have corresponding TMAX
temp.select('ID').where(((F.col('NUM_TMAX')==0)&(F.col('NUM_TMIN')==1))).count()

8848299

In [101]:
temp.select('ID').where(((F.col('NUM_TMAX')==0)&(F.col('NUM_TMIN')==1))).distinct().count()

27678

In [102]:
#Extract country code from ID
daily_core = daily_core.withColumn("COUNTRY_CODE", F.col("ID")[0:2])
daily_core.show(10)

+-----------+--------+-------+------+----------------+------------+-----------+----------------+------------+
|         ID|    DATE|ELEMENT| VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|COUNTRY_CODE|
+-----------+--------+-------+------+----------------+------------+-----------+----------------+------------+
|CA002303986|20100101|   TMAX| 205.0|            null|           G|          C|            null|          CA|
|CA002303986|20100101|   TMIN|-300.0|            null|        null|          C|            null|          CA|
|CA002303986|20100101|   PRCP|   4.0|            null|        null|          C|            null|          CA|
|CA002303986|20100101|   SNOW|   4.0|            null|        null|          C|            null|          CA|
|CA002303986|20100101|   SNWD|   0.0|            null|           I|          C|            null|          CA|
|US1FLSL0019|20100101|   PRCP|   0.0|               T|        null|          N|            null|          US|
|ASN000370

In [103]:
#Filter by TMAX TMIN and NZ
#Divide value by 10 and split date to year, month and day
daily_minmax_subset = (daily_core
.filter(((F.col("ELEMENT")=="TMAX")|(F.col("ELEMENT")=="TMIN"))&(F.col("COUNTRY_CODE")=="NZ"))                 
.select("ID",F.substring(F.col('DATE'), 0, 4).alias('YEAR'),F.substring(F.col('DATE'), 5, 2).alias('MONTH'),F.substring(F.col('DATE'), 7, 2).alias('DAY'),
        "ELEMENT",(F.col("VALUE")/10).alias("TEMP_C"),"COUNTRY_CODE")
.sort(F.col("ID"))
)
daily_minmax_subset.show(20)

+-----------+----+-----+---+-------+------+------------+
|         ID|YEAR|MONTH|DAY|ELEMENT|TEMP_C|COUNTRY_CODE|
+-----------+----+-----+---+-------+------+------------+
|NZ000093012|2004|   01| 14|   TMIN|  15.1|          NZ|
|NZ000093012|2004|   01| 12|   TMIN|  14.9|          NZ|
|NZ000093012|2004|   01| 08|   TMIN|  17.5|          NZ|
|NZ000093012|2004|   01| 02|   TMIN|  13.0|          NZ|
|NZ000093012|2004|   01| 04|   TMIN|  16.9|          NZ|
|NZ000093012|2004|   01| 06|   TMIN|  16.4|          NZ|
|NZ000093012|2004|   01| 14|   TMAX|  26.2|          NZ|
|NZ000093012|2004|   01| 20|   TMAX|  20.7|          NZ|
|NZ000093012|2004|   01| 11|   TMAX|  26.5|          NZ|
|NZ000093012|2004|   01| 03|   TMIN|  13.8|          NZ|
|NZ000093012|2004|   01| 12|   TMAX|  27.7|          NZ|
|NZ000093012|2004|   01| 05|   TMIN|  16.9|          NZ|
|NZ000093012|2004|   01| 13|   TMAX|  26.6|          NZ|
|NZ000093012|2004|   01| 13|   TMIN|  17.7|          NZ|
|NZ000093012|2004|   01| 07|   

In [106]:
#Copy the table to hdfs
daily_minmax_subset.write.csv("/user/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv")

In [126]:
#Count number of rows 
!wc -l ~/spark/outputs/ghcnd/newzealand_stations_temperature.csv/*

   20759 /users/home/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv/part-00000-5159e788-ed16-4188-bb1d-76e43d84fe12-c000.csv
   41327 /users/home/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv/part-00001-5159e788-ed16-4188-bb1d-76e43d84fe12-c000.csv
   33897 /users/home/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv/part-00002-5159e788-ed16-4188-bb1d-76e43d84fe12-c000.csv
   51253 /users/home/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv/part-00003-5159e788-ed16-4188-bb1d-76e43d84fe12-c000.csv
   53263 /users/home/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv/part-00004-5159e788-ed16-4188-bb1d-76e43d84fe12-c000.csv
   54500 /users/home/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv/part-00005-5159e788-ed16-4188-bb1d-76e43d84fe12-c000.csv
   39732 /users/home/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv/part-00006-5159e788-ed16-4188-bb1d-76e43d84fe12-c000.csv
   39422 /users/home

In [107]:
#Copy from hdfs to local
!hdfs dfs -copyToLocal /user/abh89/spark/outputs/ghcnd/newzealand_stations_temperature.csv ~/spark/outputs/ghcnd/newzealand_stations_temperature.csv

In [104]:
#Count number of rows
daily_minmax_subset.count()

474654

In [105]:
#Summary for New Zealand
subset_minmax =(daily_minmax_subset
.groupby([
        "COUNTRY_CODE"
    ])
    .agg(
        F.min(F.col('YEAR')).alias('START_YEAR'),
        F.max(F.col('YEAR')).alias('END_YEAR'),
        F.count(F.col("ID")).cast(IntegerType()).alias("COUNT")
    )
    .select(
        F.col("COUNTRY_CODE"),
        F.col("START_YEAR"),
        F.col("END_YEAR"),
        F.col("COUNT")
    )
)
subset_minmax.show(20)

+------------+----------+--------+------+
|COUNTRY_CODE|START_YEAR|END_YEAR| COUNT|
+------------+----------+--------+------+
|          NZ|      1940|    2022|474654|
+------------+----------+--------+------+



In [152]:
subset_minmax.printSchema()

root
 |-- ELEMENT: string (nullable = true)
 |-- FIRST YEAR: integer (nullable = true)
 |-- LAST YEAR: integer (nullable = true)
 |-- COUNT: integer (nullable = false)



In [108]:
#Precipitation
daily_rainfall_subset = (daily_core
.filter(((F.col("ELEMENT")=="PRCP")))                   
.select("ID",F.substring(F.col('DATE'), 0, 4).alias('YEAR'),F.substring(F.col('DATE'), 5, 2).alias('MONTH'),F.substring(F.col('DATE'), 7, 2).alias('DAY'),
        "ELEMENT",(F.col("VALUE")/10).alias("RAINFALL_MM"),"COUNTRY_CODE")
.sort(F.col("ID"))
)
daily_rainfall_subset.show(20)

+-----------+----+-----+---+-------+-----------+------------+
|         ID|YEAR|MONTH|DAY|ELEMENT|RAINFALL_MM|COUNTRY_CODE|
+-----------+----+-----+---+-------+-----------+------------+
|ACW00011604|1949|   01| 27|   PRCP|        4.1|          AC|
|ACW00011604|1949|   01| 10|   PRCP|        0.0|          AC|
|ACW00011604|1949|   02| 03|   PRCP|        0.0|          AC|
|ACW00011604|1949|   01| 18|   PRCP|        0.0|          AC|
|ACW00011604|1949|   01| 20|   PRCP|        0.0|          AC|
|ACW00011604|1949|   01| 22|   PRCP|        0.0|          AC|
|ACW00011604|1949|   01| 26|   PRCP|        1.5|          AC|
|ACW00011604|1949|   01| 16|   PRCP|        0.0|          AC|
|ACW00011604|1949|   01| 07|   PRCP|        0.0|          AC|
|ACW00011604|1949|   01| 19|   PRCP|        0.3|          AC|
|ACW00011604|1949|   01| 08|   PRCP|        0.0|          AC|
|ACW00011604|1949|   01| 21|   PRCP|        5.1|          AC|
|ACW00011604|1949|   01| 12|   PRCP|        0.0|          AC|
|ACW0001

In [115]:
rainfall.count()

17161

In [109]:
#Average rainfall by country and year
rainfall =(daily_rainfall_subset
.groupby([
        "COUNTRY_CODE","YEAR"
    ])
    .agg(
        F.count(F.col("ID")).cast(IntegerType()).alias("COUNT"),
        F.min(F.col('RAINFALL_MM')).alias('MIN_RAINFALL'),
        F.max(F.col('RAINFALL_MM')).alias('MAX_RAINFALL'),
        F.mean(F.col('RAINFALL_MM')).alias('AVERAGE_RAINFALL')
    )
    .select(
        F.col("COUNTRY_CODE"),
        F.col("YEAR"),
        F.col("COUNT"),
        F.col("MIN_RAINFALL"),
        F.col("MAX_RAINFALL"),
        F.col("AVERAGE_RAINFALL")
    )
    .join(
        countries_renamed,
        on="COUNTRY_CODE",
        how="left"
      )
    .sort(F.col("AVERAGE_RAINFALL").desc())
)
rainfall.show(20)

+------------+----+-----+------------+------------+------------------+--------------------+
|COUNTRY_CODE|YEAR|COUNT|MIN_RAINFALL|MAX_RAINFALL|  AVERAGE_RAINFALL|        COUNTRY_NAME|
+------------+----+-----+------------+------------+------------------+--------------------+
|          EK|2000|    1|       436.1|       436.1|             436.1|   Equatorial Guinea|
|          DR|1975|    1|       341.4|       341.4|             341.4| Dominican Republic |
|          LA|1974|    2|         0.0|       496.1|            248.05|               Laos |
|          BH|1978|    7|         0.0|       490.0| 224.4714285714286|              Belize|
|          NN|1979|   10|         0.0|       493.0|             196.7|        Sint Maarten|
|          CS|1974|    2|         0.0|       364.0|             182.0|         Costa Rica |
|          BH|1979|   11|         0.0|       495.0|175.55454545454543|              Belize|
|          NS|1973|    3|         0.0|       257.0|             171.0|          

In [111]:
#Save rainfall data to hdfs
rainfall.repartition(1).write.csv("/user/abh89/spark/outputs/ghcnd/rainfall.csv")

In [112]:
#Save from hdfs to local
!hdfs dfs -copyToLocal /user/abh89/spark/outputs/ghcnd/rainfall.csv ~/spark/outputs/ghcnd/rainfall.csv

In [13]:
import os
import pandas as pd

columns = ["ID","YEAR","MONTH","DAY","ELEMENT","TEMP_C","COUNTRY_CODE"]

parts = []
for file in os.listdir(os.path.expanduser("~/spark/outputs/ghcnd/newzealand_stations_temperature.csv/")):
    if file == "_SUCCESS":
        continue
    try:
        data_temperature = pd.read_csv(f"~/spark/outputs/ghcnd/newzealand_stations_temperature.csv/{file}", header=None, names=columns)
        parts.append(data_temperature)
    except pd.io.common.EmptyDataError:
        pass

data_temperature = pd.concat(parts).sort_values(["ID"])

data_temperature.head(20)

Unnamed: 0,ID,YEAR,MONTH,DAY,ELEMENT,TEMP_C,COUNTRY_CODE
16203,NZ000093012,2003,10,15,TMAX,18.0,NZ
13844,NZ000093012,1998,1,31,TMIN,16.5,NZ
13843,NZ000093012,1998,1,31,TMAX,27.3,NZ
13842,NZ000093012,1998,1,30,TMIN,16.8,NZ
13841,NZ000093012,1998,1,30,TMAX,26.3,NZ
13840,NZ000093012,1998,1,29,TMIN,19.7,NZ
13839,NZ000093012,1998,1,29,TMAX,24.7,NZ
13838,NZ000093012,1998,1,28,TMIN,19.9,NZ
13845,NZ000093012,1998,2,1,TMAX,28.6,NZ
13837,NZ000093012,1998,1,28,TMAX,26.7,NZ


In [35]:
import numpy as np
temperature_NZ = data_temperature.pivot_table(values='TEMP_C', index='YEAR', columns='ELEMENT',aggfunc=np.mean)
print(temperature_NZ)

ELEMENT       TMAX       TMIN
YEAR                         
1940     20.103010  14.803679
1941     17.051554  11.772527
1942     19.696328  10.485616
1943     15.899119  10.458767
1944     15.723814   9.830510
1945     15.722831   9.713242
1946     15.629589   9.760913
1947     15.632968   9.869680
1948     15.515091   9.072387
1949     15.414211   8.567587
1950     15.432237   7.410033
1951     15.173779   7.206195
1952     15.366319   7.556503
1953     15.326703   7.387884
1954     15.787610   7.636374
1955     15.909151   7.892932
1956     15.990110   8.137383
1957     15.388676   7.924292
1958     15.362466   7.827971
1959     15.354271   7.593239
1960     15.501252   7.544467
1961     15.179139   7.548981
1962     16.285826   8.279705
1963     15.484896   7.383188
1964     15.586311   7.573317
1965     15.351930   7.370930
1966     15.938278   7.909321
1967     16.020385   8.016456
1968     16.124380   8.738690
1969     15.897413   7.871758
...            ...        ...
1993     1

In [43]:
stations_NZ = data_temperature['ID'].unique()

In [22]:
data_temperature.dtypes

ID               object
YEAR              int64
MONTH             int64
DAY               int64
ELEMENT          object
TEMP_C          float64
COUNTRY_CODE     object
dtype: object

In [63]:
import pandas as pd

columns = ["COUNTRY_CODE","YEAR","COUNT","MIN_RAINFALL","MAX_RAINFALL","AVERAGE_RAINFALL","COUNTRY_NAME"]

parts = []
for file in os.listdir(os.path.expanduser("~/spark/outputs/ghcnd/rainfall.csv/")):
    if file == "_SUCCESS":
        continue
    try:
        data_rainfall = pd.read_csv(f"~/spark/outputs/ghcnd/rainfall.csv/{file}", header=None, names=columns)
        parts.append(data_rainfall)
    except pd.io.common.EmptyDataError:
        pass

data_rainfall.sort_values(["COUNTRY_CODE"]).head(20)

Unnamed: 0,COUNTRY_CODE,YEAR,COUNT,MIN_RAINFALL,MAX_RAINFALL,AVERAGE_RAINFALL,COUNTRY_NAME
14867,AC,1966,243,0.0,42.7,1.571605,Antigua and Barbuda
1565,AC,1962,363,0.0,103.1,3.091736,Antigua and Barbuda
13985,AC,1968,245,0.0,47.0,1.748571,Antigua and Barbuda
10234,AC,1958,334,0.0,79.8,3.56497,Antigua and Barbuda
14396,AC,1967,59,0.0,77.2,1.667797,Antigua and Barbuda
5187,AC,1970,32,0.0,15.5,0.684375,Antigua and Barbuda
12135,AC,1961,334,0.0,41.9,2.397904,Antigua and Barbuda
1646,AC,1959,365,0.0,109.0,3.056164,Antigua and Barbuda
10634,AC,1969,313,0.0,79.0,3.353355,Antigua and Barbuda
8027,AC,1957,122,0.0,56.1,5.322951,Antigua and Barbuda


In [121]:
data_rainfall.count()

COUNTRY_CODE        17161
YEAR                17161
COUNT               17161
MIN_RAINFALL        17161
MAX_RAINFALL        17161
AVERAGE_RAINFALL    17161
COUNTRY_NAME        17161
dtype: int64

In [70]:
rainfall_2021 = data_rainfall[data_rainfall["YEAR"] == 2021]
#rainfall_2021 =rainfall_2021["COUNTRY_NAME","AVERAGE_RAINFALL"]
rainfall_2021.head()

Unnamed: 0,COUNTRY_CODE,YEAR,COUNT,MIN_RAINFALL,MAX_RAINFALL,AVERAGE_RAINFALL,COUNTRY_NAME
258,GV,2021,144,0.0,214.9,17.975694,Guinea
289,ID,2021,538,0.0,340.1,16.664684,Indonesia
297,CB,2021,10,0.0,52.1,16.42,Cambodia
309,ZA,2021,5,0.0,39.1,16.06,Zambia
329,BM,2021,667,0.0,224.0,15.587856,Burma


In [73]:
rainfall_2021 = rainfall_2021[["COUNTRY_NAME","AVERAGE_RAINFALL"]]
rainfall_2021.head()

Unnamed: 0,COUNTRY_NAME,AVERAGE_RAINFALL
258,Guinea,17.975694
289,Indonesia,16.664684
297,Cambodia,16.42
309,Zambia,16.06
329,Burma,15.587856


In [193]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()