### Spark notebook ###

This notebook will only work in a Jupyter session running on `mathmadslinux2p`.

You can start your own Jupyter session on `mathmadslinux2p` and open this notebook in Chrome on the MADS Windows server by

**Steps**

1. Login to the MADS Windows server using https://mathportal.canterbury.ac.nz/.
2. Download or copy this notebook to your home directory.
3. Open powershell and run `ssh mathmadslinux2p`.
4. Run `start_pyspark_notebook` or `/opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999))))`.
5. Copy / paste the url provided in the shell window into Chrome on the MADS Windows server.
6. Open the notebook from the Jupyter root directory (which is your home directory).
7. Run `start_spark()` to start a spark session in the notebook.
8. Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the Spark UI.

In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

### Example notebook ###

The code below provides a template for how you would use a notebook to start spark, run some code, and then stop spark.

**Steps**

- Run `start_spark()` to start a spark session in the notebook (only change the default resources when advised to do so for an exercise or assignment)
- Write and run code interactively, creating additional cells as needed.
- Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the [Spark UI](http://mathmadslinux2p.canterbury.ac.nz:8080/).

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1)

0,1
spark.dynamicAllocation.enabled,false
spark.driver.memory,4g
spark.driver.port,46813
spark.executor.memory,4g
spark.sql.warehouse.dir,file:/users/home/dcp31/assignment_1/spark-warehouse
spark.master,spark://masternode2:7077
spark.app.name,dcp31 (jupyter)
spark.executor.id,driver
spark.app.startTime,1714528529485
spark.executor.instances,8


In [3]:
# Write your imports here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *


### Question 1

In [4]:
!hdfs dfs -ls /data/ghcnd/

Found 5 items
drwxr-xr-x   - jsw93 supergroup          0 2024-03-19 00:11 /data/ghcnd/daily
-rwxr-xr-x   8 jsw93 supergroup       3659 2024-03-11 00:45 /data/ghcnd/ghcnd-countries.txt
-rwxr-xr-x   8 jsw93 supergroup   34380032 2024-03-11 00:39 /data/ghcnd/ghcnd-inventory.txt
-rwxr-xr-x   8 jsw93 supergroup       1086 2024-03-11 00:45 /data/ghcnd/ghcnd-states.txt
-rwxr-xr-x   8 jsw93 supergroup   10834968 2024-03-11 00:39 /data/ghcnd/ghcnd-stations.txt


In [5]:
!hdfs dfs -ls /data/ghcnd/daily/

Found 263 items
-rw-r--r--   8 jsw93 supergroup     517706 2024-03-18 23:56 /data/ghcnd/daily/1750.csv.gz
-rw-r--r--   8 jsw93 supergroup       3358 2024-03-18 23:57 /data/ghcnd/daily/1763.csv.gz
-rw-r--r--   8 jsw93 supergroup       3327 2024-03-18 23:54 /data/ghcnd/daily/1764.csv.gz
-rw-r--r--   8 jsw93 supergroup       3335 2024-03-18 23:54 /data/ghcnd/daily/1765.csv.gz
-rw-r--r--   8 jsw93 supergroup       3344 2024-03-18 23:49 /data/ghcnd/daily/1766.csv.gz
-rw-r--r--   8 jsw93 supergroup       3356 2024-03-18 23:56 /data/ghcnd/daily/1767.csv.gz
-rw-r--r--   8 jsw93 supergroup       3325 2024-03-18 23:53 /data/ghcnd/daily/1768.csv.gz
-rw-r--r--   8 jsw93 supergroup       3418 2024-03-18 23:54 /data/ghcnd/daily/1769.csv.gz
-rw-r--r--   8 jsw93 supergroup       3357 2024-03-18 23:56 /data/ghcnd/daily/1770.csv.gz
-rw-r--r--   8 jsw93 supergroup       3373 2024-03-18 23:56 /data/ghcnd/daily/1771.csv.gz
-rw-r--r--   8 jsw93 supergroup       3419 2024-03-18 23:55 /data/ghcnd/daily/1772.c

In [6]:
!hdfs dfs -count /data/ghcnd/

           2          267        13324160113 /data/ghcnd


In [7]:
!hdfs dfs -count /data/ghcnd/daily/

           1          263        13278940368 /data/ghcnd/daily


In [4]:
# for exploring 1750 data - why is it much larger than the other files? 

schema_daily = StructType([
    StructField("station_id", StringType(), True),
    StructField("date", IntegerType(), True),
    StructField("element", StringType(), True),
    StructField("value", IntegerType(), True),
    StructField("m_flag", StringType(), True),
    StructField("q_flag", StringType(), True),
    StructField("s_flag", StringType(), True),
    StructField("time", StringType(), True)
])

daily_1750 = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/1750.csv.gz")
)
    
show_as_html(daily_1750, 10)

Unnamed: 0,station_id,date,element,value,m_flag,q_flag,s_flag,time
0,ASN00002061,17500201,PRCP,56,,,a,
1,ASN00003014,17500201,PRCP,0,,,a,
2,ASN00003059,17500201,PRCP,0,,,a,
3,ASN00003088,17500201,PRCP,0,,,a,
4,ASN00007001,17500201,PRCP,0,,,a,
5,ASN00009015,17500201,PRCP,0,,,a,
6,ASN00009193,17500201,TMIN,187,,,a,
7,ASN00009193,17500201,PRCP,0,,,a,
8,ASN00009500,17500201,DATX,2,,,a,
9,ASN00009500,17500201,MDTX,210,,,a,


In [39]:
print(daily_1750.count())

#print(daily_1750.groupBy(F.col("date")).count().count()) # number of dates included

daily_1750.groupBy(F.col("date")).count().orderBy(F.asc("date")).show(66) 

print(daily_1750.select("station_id").distinct().count())

# it is only 1750 data going by the date column. But could this be incorrect? It is only Feb-April

126775
+--------+-----+
|    date|count|
+--------+-----+
|17500201|  262|
|17500202|  283|
|17500203|  225|
|17500204|  232|
|17500205|  237|
|17500206|  200|
|17500207|  262|
|17500208|  372|
|17500209|  231|
|17500210|  202|
|17500211|  194|
|17500212|  192|
|17500213|  206|
|17500214|  220|
|17500215|  271|
|17500216|  185|
|17500217|  177|
|17500218|  182|
|17500219|  135|
|17500220|  129|
|17500221|  142|
|17500222|  136|
|17500223|  121|
|17500224|  113|
|17500225|   81|
|17500226|  135|
|17500227|   50|
|17500228|  121|
|17500301| 3466|
|17500302| 3442|
|17500303| 3392|
|17500304| 3428|
|17500305| 3532|
|17500306| 3442|
|17500307| 3446|
|17500308| 3433|
|17500309| 3419|
|17500310| 3418|
|17500311| 3438|
|17500312| 3500|
|17500313| 3436|
|17500314| 3421|
|17500315| 3374|
|17500316| 3364|
|17500317| 3326|
|17500318| 3343|
|17500319| 3450|
|17500320| 3347|
|17500321| 3350|
|17500322| 3327|
|17500323| 3299|
|17500324| 3213|
|17500325| 3266|
|17500326| 3375|
|17500327| 3213|
|175003

In [12]:
# could they be duplicates? Group by all vars and see if there is more than one. 

duplicates = (
    daily_1750.groupBy("station_id", "date", "element", "value", "m_flag", "q_flag", "s_flag", "time")
    .agg(F.count("*").alias("count"))
    .filter(F.col("count") > 1)
)

In [13]:
# none
duplicates.show()

+----------+----+-------+-----+------+------+------+----+-----+
|station_id|date|element|value|m_flag|q_flag|s_flag|time|count|
+----------+----+-------+-----+------+------+------+----+-----+
+----------+----+-------+-----+------+------+------+----+-----+



In [35]:
# compare to 1763

daily_1763 = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/1763.csv.gz")
)

show_as_html(daily_1763, 10)

Unnamed: 0,station_id,date,element,value,m_flag,q_flag,s_flag,time
0,ITE00100554,17630101,TMAX,-36,,,E,
1,ITE00100554,17630101,TMIN,-50,,,E,
2,ITE00100554,17630102,TMAX,-26,,,E,
3,ITE00100554,17630102,TMIN,-40,,,E,
4,ITE00100554,17630103,TMAX,-9,,,E,
5,ITE00100554,17630103,TMIN,-29,,,E,
6,ITE00100554,17630104,TMAX,-4,,,E,
7,ITE00100554,17630104,TMIN,-24,,,E,
8,ITE00100554,17630105,TMAX,21,,,E,
9,ITE00100554,17630105,TMIN,1,,,E,


In [41]:
print(daily_1763.count())

print(daily_1763.select("station_id").distinct().count())

730
1


In [42]:
# compare to 1773

daily_1773 = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/1773.csv.gz")
)

show_as_html(daily_1773, 10)

Unnamed: 0,station_id,date,element,value,m_flag,q_flag,s_flag,time
0,ITE00100554,17730101,TMAX,56,,,E,
1,ITE00100554,17730101,TMIN,36,,,E,
2,ITE00100554,17730102,TMAX,40,,,E,
3,ITE00100554,17730102,TMIN,16,,,E,
4,ITE00100554,17730103,TMAX,37,,,E,
5,ITE00100554,17730103,TMIN,23,,,E,
6,ITE00100554,17730104,TMAX,18,,,E,
7,ITE00100554,17730104,TMIN,-2,,,E,
8,ITE00100554,17730105,TMAX,18,,,E,
9,ITE00100554,17730105,TMIN,4,,,E,


In [43]:
print(daily_1773.count())

print(daily_1773.select("station_id").distinct().count())

730
1


In [44]:
# compare to 1768

daily_1768 = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/1768.csv.gz")
)

show_as_html(daily_1768, 10)

Unnamed: 0,station_id,date,element,value,m_flag,q_flag,s_flag,time
0,ITE00100554,17680101,TMAX,-33,,I,E,
1,ITE00100554,17680101,TMIN,-57,,,E,
2,ITE00100554,17680102,TMAX,-1,,,E,
3,ITE00100554,17680102,TMIN,-15,,,E,
4,ITE00100554,17680103,TMAX,-18,,,E,
5,ITE00100554,17680103,TMIN,-32,,,E,
6,ITE00100554,17680104,TMAX,-8,,,E,
7,ITE00100554,17680104,TMIN,-38,,,E,
8,ITE00100554,17680105,TMAX,-46,,,E,
9,ITE00100554,17680105,TMIN,-76,,,E,


In [45]:
print(daily_1768.count())

print(daily_1768.select("station_id").distinct().count())

732
1


### Question 2

In [8]:
# load 1000 rows of daily 

schema_daily = StructType([
    StructField("station_id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("element", StringType(), True),
    StructField("value", IntegerType(), True),
    StructField("m_flag", StringType(), True),
    StructField("q_flag", StringType(), True),
    StructField("s_flag", StringType(), True),
    StructField("time", StringType(), True)
])

daily = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/2023.csv.gz")
    .limit(1000))

daily.show(10)


+-----------+--------+-------+-----+------+------+------+----+
| station_id|    date|element|value|m_flag|q_flag|s_flag|time|
+-----------+--------+-------+-----+------+------+------+----+
|AE000041196|20230101|   TMAX|  252|  null|  null|     S|null|
|AE000041196|20230101|   TMIN|  149|  null|  null|     S|null|
|AE000041196|20230101|   PRCP|    0|     D|  null|     S|null|
|AE000041196|20230101|   TAVG|  207|     H|  null|     S|null|
|AEM00041194|20230101|   TMAX|  255|  null|  null|     S|null|
|AEM00041194|20230101|   TMIN|  186|  null|  null|     S|null|
|AEM00041194|20230101|   PRCP|    0|  null|  null|     S|null|
|AEM00041194|20230101|   TAVG|  223|     H|  null|     S|null|
|AEM00041217|20230101|   TMAX|  248|  null|  null|     S|null|
|AEM00041217|20230101|   TMIN|  184|  null|  null|     S|null|
+-----------+--------+-------+-----+------+------+------+----+
only showing top 10 rows



In [9]:
# Read the first 10 lines from the CSV file to check my extraction was correct above given nulls
spark.sparkContext.textFile("hdfs:///data/ghcnd/daily/2023.csv.gz").take(20)


['AE000041196,20230101,TMAX,252,,,S,',
 'AE000041196,20230101,TMIN,149,,,S,',
 'AE000041196,20230101,PRCP,0,D,,S,',
 'AE000041196,20230101,TAVG,207,H,,S,',
 'AEM00041194,20230101,TMAX,255,,,S,',
 'AEM00041194,20230101,TMIN,186,,,S,',
 'AEM00041194,20230101,PRCP,0,,,S,',
 'AEM00041194,20230101,TAVG,223,H,,S,',
 'AEM00041217,20230101,TMAX,248,,,S,',
 'AEM00041217,20230101,TMIN,184,,,S,',
 'AEM00041217,20230101,TAVG,215,H,,S,',
 'AEM00041218,20230101,TMAX,254,,,S,',
 'AEM00041218,20230101,TMIN,145,,,S,',
 'AEM00041218,20230101,TAVG,193,H,,S,',
 'AG000060390,20230101,TMIN,47,,,S,',
 'AG000060390,20230101,TAVG,128,H,,S,',
 'AG000060590,20230101,TAVG,134,H,,S,',
 'AG000060611,20230101,TMIN,24,,,S,',
 'AG000060611,20230101,TAVG,104,H,,S,',
 'AGE00147708,20230101,TMIN,96,,,S,']

In [10]:
# see how many values in columns with null
daily.groupBy("q_flag").count().orderBy(F.desc("count")).show() # "Blank = did not fail any quality assurance check"
daily.groupBy("time").count().orderBy(F.desc("count")).show() 

+------+-----+
|q_flag|count|
+------+-----+
|  null| 1000|
+------+-----+

+----+-----+
|time|count|
+----+-----+
|null|  996|
|2400|    3|
|1000|    1|
+----+-----+



In [11]:
!hadoop fs -cat hdfs:///data/ghcnd/ghcnd-stations.txt | head -n 10 # view first 10 rows

ACW00011604  17.1167  -61.7833   10.1    ST JOHNS COOLIDGE FLD                       
ACW00011647  17.1333  -61.7833   19.2    ST JOHNS                                    
AE000041196  25.3330   55.5170   34.0    SHARJAH INTER. AIRP            GSN     41196
AEM00041194  25.2550   55.3640   10.4    DUBAI INTL                             41194
AEM00041217  24.4330   54.6510   26.8    ABU DHABI INTL                         41217
AEM00041218  24.2620   55.6090  264.9    AL AIN INTL                            41218
AF000040930  35.3170   69.0170 3366.0    NORTH-SALANG                   GSN     40930
AFM00040938  34.2100   62.2280  977.2    HERAT                                  40938
AFM00040948  34.5660   69.2120 1791.3    KABUL INTL                             40948
AFM00040990  31.5000   65.8500 1010.0    KANDAHAR AIRPORT                       40990
cat: Unable to write to output stream.


In [4]:
# load stations metatable which has fixed-width formatting (as do all the metadata tables)

stations = (
    spark.read.text("hdfs:///data/ghcnd/ghcnd-stations.txt")
)
    
stations = stations.select(
    stations.value.substr(1, 11).alias("station_id"),   # station id
    stations.value.substr(13, 8).alias("latitude"),     # latitude
    stations.value.substr(22, 9).alias("longitude"),    # longitude
    stations.value.substr(32, 6).alias("elevation"),    # elevation
    stations.value.substr(39, 2).alias("state"),        # state
    stations.value.substr(42, 30).alias("name"),        # name
    stations.value.substr(73, 3).alias("gsn_flag"),     # GSN flag
    stations.value.substr(77, 3).alias("hcn_crn_flag"),# HCN/CRN flag
    stations.value.substr(81, 5).alias("wmo_id")       # WMO id    
)

# chage data tyes where necessary
stations = stations.select(
    F.col("station_id").cast("string"),
    F.col("latitude").cast("float"),
    F.col("longitude").cast("float"),
    F.col("elevation").cast("float"),
    F.col("state").cast("string"),
    F.col("name").cast("string"),
    F.col("gsn_flag").cast("string"),
    F.col("hcn_crn_flag").cast("string"),
    F.col("wmo_id").cast("integer")
)

show_as_html(stations, 100)

Unnamed: 0,station_id,latitude,longitude,elevation,state,name,gsn_flag,hcn_crn_flag,wmo_id
0,ACW00011604,17.116699,-61.783298,10.100000,,ST JOHNS COOLIDGE FLD,,,
1,ACW00011647,17.133301,-61.783298,19.200001,,ST JOHNS,,,
2,AE000041196,25.333000,55.516998,34.000000,,SHARJAH INTER. AIRP,GSN,,41196.0
3,AEM00041194,25.254999,55.363998,10.400000,,DUBAI INTL,,,41194.0
4,AEM00041217,24.433001,54.651001,26.799999,,ABU DHABI INTL,,,41217.0
5,AEM00041218,24.261999,55.609001,264.899994,,AL AIN INTL,,,41218.0
6,AF000040930,35.317001,69.016998,3366.000000,,NORTH-SALANG,GSN,,40930.0
7,AFM00040938,34.209999,62.228001,977.200012,,HERAT,,,40938.0
8,AFM00040948,34.566002,69.211998,1791.300049,,KABUL INTL,,,40948.0
9,AFM00040990,31.500000,65.849998,1010.000000,,KANDAHAR AIRPORT,,,40990.0


In [26]:
# row 91 has different formatting so has not parsed correctly. How to manage?? - ignore

!hadoop fs -cat hdfs:///data/ghcnd/ghcnd-stations.txt | sed -n '89p'
!hadoop fs -cat hdfs:///data/ghcnd/ghcnd-stations.txt | sed -n '90p'
!hadoop fs -cat hdfs:///data/ghcnd/ghcnd-stations.txt | sed -n '91p' # inconsistent formatting
!hadoop fs -cat hdfs:///data/ghcnd/ghcnd-stations.txt | sed -n '92p'

stations.filter(F.col("station_id") == "AGM00060620").show()

AGM00060603  29.7170    6.7000  252.0    RHOURD NOUSS                           60603
AGM00060607  29.2370    0.2760  313.0    TIMIMOUN                               60607
AGM00060620  27.838                                                                                                                                                                                                                                                                                                                                                                                                                                                   0.9500  399.0    BORDJ-BADJ-MOKHTAR                     60686
AGM00060690  19.5670    5.7670  401.0    IN-GUEZZAM                             60690
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
| station_id|latitude|longitude|elevation|state|                name|gsn_flag|hcn_crn_flag|wmo_id|
+-----------+--------+------

In [17]:
stations.groupBy("state").count().orderBy(F.desc("count")).show() # only for some countries
stations.groupBy("hcn_crn_flag").count().orderBy(F.desc("count")).show() # one row shows inconsistent formatting

+-----+-----+
|state|count|
+-----+-----+
|     |43983|
|   TX| 6154|
|   CO| 4640|
|   CA| 3080|
|   NC| 2612|
|   NE| 2387|
|   NM| 2234|
|   KS| 2217|
|   MN| 2199|
|   FL| 2142|
|   IL| 2127|
|   ON| 1985|
|   OR| 1981|
|   IN| 1924|
|   NY| 1815|
|   BC| 1713|
|   AZ| 1655|
|   TN| 1655|
|   WA| 1646|
|   MO| 1565|
+-----+-----+
only showing top 20 rows

+------------+------+
|hcn_crn_flag| count|
+------------+------+
|            |124530|
|         HCN|  1218|
|         CRN|   234|
|            |     1|
+------------+------+



In [5]:
# load countries
countries = (
    spark.read.text("hdfs:///data/ghcnd/ghcnd-countries.txt")
)
    
countries = countries.select(
    countries.value.substr(1, 2).alias("country_code"),   # country_code
    countries.value.substr(4, 61).alias("country_name"),  # country_name 
)

show_as_html(countries, 10)

Unnamed: 0,country_code,country_name
0,AC,Antigua and Barbuda
1,AE,United Arab Emirates
2,AF,Afghanistan
3,AG,Algeria
4,AJ,Azerbaijan
5,AL,Albania
6,AM,Armenia
7,AO,Angola
8,AQ,American Samoa [United States]
9,AR,Argentina


In [6]:
# load states
states = (
    spark.read.text("hdfs:///data/ghcnd/ghcnd-states.txt")
)
    
states = states.select(
    states.value.substr(1, 2).alias("state_code"),   # state_code
    states.value.substr(4, 47).alias("state_name"),  # state_name 
)

show_as_html(states, 10)

Unnamed: 0,state_code,state_name
0,AB,ALBERTA
1,AK,ALASKA
2,AL,ALABAMA
3,AR,ARKANSAS
4,AS,AMERICAN SAMOA
5,AZ,ARIZONA
6,BC,BRITISH COLUMBIA
7,CA,CALIFORNIA
8,CO,COLORADO
9,CT,CONNECTICUT


In [20]:
states.groupBy("state_name").count().orderBy(F.desc("count")).show(100)

# Alamaba looks weird so checking specific row
row = states.filter(F.col("state_name").contains("ALABAMA"))
show_as_html(row)

+--------------------+-----+
|          state_name|count|
+--------------------+-----+
|            ARKANSAS|    1|
|             FLORIDA|    1|
|            ILLINOIS|    1|
|            KENTUCKY|    1|
|           MINNESOTA|    1|
|         MISSISSIPPI|    1|
|       NEW BRUNSWICK|    1|
|       NEW HAMPSHIRE|    1|
|            OKLAHOMA|    1|
|PRINCE EDWARD ISLAND|    1|
|               PALAU|    1|
|              QUEBEC|    1|
|      SOUTH CAROLINA|    1|
|           TENNESSEE|    1|
|                UTAH|    1|
|         CONNECTICUT|    1|
|          MICRONESIA|    1|
|        SOUTH DAKOTA|    1|
|        SASKATCHEWAN|    1|
|               TEXAS|    1|
|              ALASKA|    1|
|ALABAMA          ...|    1|
|    BRITISH COLUMBIA|    1|
|          CALIFORNIA|    1|
|DISTRICT OF COLUMBIA|    1|
|           LOUISIANA|    1|
|            MARYLAND|    1|
|NORTHERN MARIANA ...|    1|
|              OREGON|    1|
|     PACIFIC ISLANDS|    1|
|            VIRGINIA|    1|
|      VIRGIN 

Unnamed: 0,state_code,state_name
0,AL,ALABAMA


In [8]:
# load inventory
inventory = (
    spark.read.text("hdfs:///data/ghcnd/ghcnd-inventory.txt")
)
    
inventory = inventory.select(
    inventory.value.substr(1, 11).alias("station_id"),   # station id
    inventory.value.substr(13, 8).alias("latitude"),     # latitude
    inventory.value.substr(22, 9).alias("longitude"),    # longitude
    inventory.value.substr(32, 4).alias("element"),      # element
    inventory.value.substr(37, 4).alias("first_year"),   # first_year
    inventory.value.substr(42, 4).alias("last_year")     # last_year     
)

inventory = inventory.select(
    F.col("station_id").cast("string"),
    F.col("latitude").cast("float"),
    F.col("longitude").cast("float"),
    F.col("element").cast("string"),
    F.col("first_year").cast("integer"),
    F.col("last_year").cast("integer")
)


In [22]:
show_as_html(inventory, 10)

Unnamed: 0,station_id,latitude,longitude,element,first_year,last_year
0,ACW00011604,17.116699,-61.783298,TMAX,1949,1949
1,ACW00011604,17.116699,-61.783298,TMIN,1949,1949
2,ACW00011604,17.116699,-61.783298,PRCP,1949,1949
3,ACW00011604,17.116699,-61.783298,SNOW,1949,1949
4,ACW00011604,17.116699,-61.783298,SNWD,1949,1949
5,ACW00011604,17.116699,-61.783298,PGTM,1949,1949
6,ACW00011604,17.116699,-61.783298,WDFG,1949,1949
7,ACW00011604,17.116699,-61.783298,WSFG,1949,1949
8,ACW00011604,17.116699,-61.783298,WT03,1949,1949
9,ACW00011604,17.116699,-61.783298,WT08,1949,1949


In [23]:
print(inventory.groupBy("element").count().count()) # number of rows per elements

inventory.groupBy("element").count().orderBy(F.desc("count")).show(145) # one inconsistent row


145
+-------+------+
|element| count|
+-------+------+
|   PRCP|123966|
|   SNOW| 77269|
|   MDPR| 68999|
|   SNWD| 64974|
|   DAPR| 61735|
|   TMAX| 40392|
|   TMIN| 40288|
|   WESD| 25032|
|   WESF| 24243|
|   WT01| 17073|
|   WT03| 16512|
|   TOBS| 15711|
|   WT05| 14639|
|   WT04| 14603|
|   WT11| 13373|
|   DWPR| 12403|
|   WT06| 11871|
|   MDSF| 10745|
|   TAVG|  9456|
|   WT14|  8447|
|   WT08|  8307|
|   WT16|  7741|
|   WT18|  6356|
|   DASF|  6070|
|   WT09|  5639|
|   WT07|  4449|
|   WSFG|  1748|
|   WDFG|  1596|
|   PGTM|  1497|
|   AWND|  1388|
|   WT10|  1382|
|   WT02|  1255|
|   WDF2|  1193|
|   WSF2|  1190|
|   WSF5|  1181|
|   WDF5|  1181|
|   WDMV|  1073|
|   EVAP|   999|
|   FMTM|   956|
|   TSUN|   927|
|   MDEV|   804|
|   DAEV|   804|
|   MDTN|   787|
|   DATN|   787|
|   MDWM|   768|
|   DAWM|   767|
|   MDTX|   765|
|   DATX|   765|
|   MXPN|   683|
|   MNPN|   678|
|   WT13|   534|
|   RHMX|   476|
|   RHMN|   476|
|   RHAV|   475|
|   AWBT|   474|
|   ASLP| 

In [27]:
# filter row where 'element' is equal to '36.5000' -> this has changed to '|'' since first run. Amended code to base on id.
inventory.filter(F.col("station_id") == 'AGM00060402').show()

# looks like another formatting error in the original data?? -> missing data when looking at the txt file later

+-----------+--------+---------+-------+----------+---------+
| station_id|latitude|longitude|element|first_year|last_year|
+-----------+--------+---------+-------+----------+---------+
|AGM00060402|  36.712|     5.07|   TMAX|      1973|     2024|
|AGM00060402|  36.712|     5.07|   TMIN|      1973|     2024|
|AGM00060402|  36.712|     5.07|   PRCP|      1979|     2024|
|AGM00060402|  36.712|     5.07|   SNWD|      2005|     2005|
|AGM00060402|  36.712|     5.07|       |      null|     null|
+-----------+--------+---------+-------+----------+---------+



In [9]:
# number of rows in each metatable
print(stations.count())
print(countries.count())
print(states.count())
print(inventory.count())

125983
219
74
747382


In [23]:
# count null values in stations table, column "wmo_id"
stations.filter(F.col("wmo_id").isNull()).count()

118024

## Question 3

In [10]:
# (a)

# extract country_code from station_id (first two letters) and add as new variable
stations = stations.withColumn('country_code', F.col('station_id').substr(1, 2))

stations.show(10)

+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+------------+
| station_id|latitude|longitude|elevation|state|                name|gsn_flag|hcn_crn_flag|wmo_id|country_code|
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+------------+
|ACW00011604| 17.1167| -61.7833|     10.1|     |ST JOHNS COOLIDGE...|        |            |  null|          AC|
|ACW00011647| 17.1333| -61.7833|     19.2|     |ST JOHNS         ...|        |            |  null|          AC|
|AE000041196|  25.333|   55.517|     34.0|     |SHARJAH INTER. AI...|     GSN|            | 41196|          AE|
|AEM00041194|  25.255|   55.364|     10.4|     |DUBAI INTL       ...|        |            | 41194|          AE|
|AEM00041217|  24.433|   54.651|     26.8|     |ABU DHABI INTL   ...|        |            | 41217|          AE|
|AEM00041218|  24.262|   55.609|    264.9|     |AL AIN INTL      ...|        |            | 41218|      

In [11]:
# (b)

# join countries to stations using country_code var created above
stations = (
    stations
    .join(
        countries,
        on="country_code",
        how="left"
    )
)
show_as_html(stations)

Unnamed: 0,country_code,station_id,latitude,longitude,elevation,state,name,gsn_flag,hcn_crn_flag,wmo_id,country_name
0,AC,ACW00011604,17.116699,-61.783298,10.1,,ST JOHNS COOLIDGE FLD,,,,Antigua and Barbuda
1,AC,ACW00011647,17.133301,-61.783298,19.200001,,ST JOHNS,,,,Antigua and Barbuda
2,AE,AE000041196,25.333,55.516998,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,United Arab Emirates
3,AE,AEM00041194,25.254999,55.363998,10.4,,DUBAI INTL,,,41194.0,United Arab Emirates
4,AE,AEM00041217,24.433001,54.651001,26.799999,,ABU DHABI INTL,,,41217.0,United Arab Emirates
5,AE,AEM00041218,24.261999,55.609001,264.899994,,AL AIN INTL,,,41218.0,United Arab Emirates
6,AF,AF000040930,35.317001,69.016998,3366.0,,NORTH-SALANG,GSN,,40930.0,Afghanistan
7,AF,AFM00040938,34.209999,62.228001,977.200012,,HERAT,,,40938.0,Afghanistan
8,AF,AFM00040948,34.566002,69.211998,1791.300049,,KABUL INTL,,,40948.0,Afghanistan
9,AF,AFM00040990,31.5,65.849998,1010.0,,KANDAHAR AIRPORT,,,40990.0,Afghanistan


In [13]:
# (c)

# rename 'state' as 'state_code' so can join with states df on 'state_code'
stations = stations.withColumnRenamed("state", "state_code")

# join states to stations on state_code 
stations = (
    stations
    .join(
        states,
        on="state_code",
        how="left"
    )
)
show_as_html(stations)

Unnamed: 0,state_code,country_code,station_id,latitude,longitude,elevation,name,gsn_flag,hcn_crn_flag,wmo_id,country_name,state_name
0,,AC,ACW00011604,17.116699,-61.783298,10.1,ST JOHNS COOLIDGE FLD,,,,Antigua and Barbuda,
1,,AC,ACW00011647,17.133301,-61.783298,19.200001,ST JOHNS,,,,Antigua and Barbuda,
2,,AE,AE000041196,25.333,55.516998,34.0,SHARJAH INTER. AIRP,GSN,,41196.0,United Arab Emirates,
3,,AE,AEM00041194,25.254999,55.363998,10.4,DUBAI INTL,,,41194.0,United Arab Emirates,
4,,AE,AEM00041217,24.433001,54.651001,26.799999,ABU DHABI INTL,,,41217.0,United Arab Emirates,
5,,AE,AEM00041218,24.261999,55.609001,264.899994,AL AIN INTL,,,41218.0,United Arab Emirates,
6,,AF,AF000040930,35.317001,69.016998,3366.0,NORTH-SALANG,GSN,,40930.0,Afghanistan,
7,,AF,AFM00040938,34.209999,62.228001,977.200012,HERAT,,,40938.0,Afghanistan,
8,,AF,AFM00040948,34.566002,69.211998,1791.300049,KABUL INTL,,,40948.0,Afghanistan,
9,,AF,AFM00040990,31.5,65.849998,1010.0,KANDAHAR AIRPORT,,,40990.0,Afghanistan,


In [15]:
#(d)

# first and last years active for each station
inventory_years = (inventory.groupBy("station_id")
                   .agg(F.min("first_year").alias("first_year_active"),
                        F.max("last_year").alias("last_year_active"))
                   .orderBy(F.asc("first_year_active"))
                  )

#inventory.groupBy("element").count().orderBy(F.desc("count")).show(150)


In [28]:
# Show the first 10 rows of the result
show_as_html(inventory_years, 10)

# count the number of rows
print(inventory_years.count())

# there were 125,983 stations in the stations table. Therefore, 8 must not be represented in the inventory table.

Unnamed: 0,station_id,first_year_active,last_year_active
0,ASN00002009,1750,2023
1,ASN00007101,1750,2023
2,ASN00002044,1750,2023
3,ASN00001006,1750,2023
4,ASN00002048,1750,2023
5,ASN00004019,1750,2023
6,ASN00007081,1750,2023
7,ASN00006011,1750,2024
8,ASN00001020,1750,2023
9,ASN00006102,1750,2023


125975


In [29]:
# find the stations first active in 2023
active_2023 = inventory_years.filter(F.col("first_year_active") == '2023')

print(active_2023.count())

1035


In [30]:
# of the stations first active in 2023, how many only reocrded in 2023 vs how many have records for 2024?
(
    active_2023.groupBy("last_year_active")
    .count()
    .orderBy(F.desc("count"))
    .show()
)

+----------------+-----+
|last_year_active|count|
+----------------+-----+
|            2024|  878|
|            2023|  157|
+----------------+-----+



In [31]:
# how many stations collected data in 1750 when records first began
active_1750 = inventory_years.filter(F.col("first_year_active") == '1750')
print(active_1750.count())

2423


In [32]:
# of the stations collective data in 1750, when did they stop recording and how many are stilla ctive in 2024?
(
    active_1750.groupBy("last_year_active")
    .count()
    .orderBy(F.desc("count"))
    .show()
)

+----------------+-----+
|last_year_active|count|
+----------------+-----+
|            2023| 2352|
|            2024|   64|
|            2022|    3|
|            2021|    2|
|            2020|    1|
|            2013|    1|
+----------------+-----+



In [33]:
# find the number of unique elements collected for each station
number_unique_elements = (
    inventory.groupBy("station_id")
    .agg(F.countDistinct("element").alias("no_unique_elements"))
    .orderBy(F.asc("no_unique_elements"))
)

number_unique_elements.show()

print(number_unique_elements.count())

+-----------+------------------+
| station_id|no_unique_elements|
+-----------+------------------+
|AJ000037883|                 1|
|ASN00033292|                 1|
|ASN00009718|                 1|
|USC00413654|                 1|
|ASN00009719|                 1|
|USC00417528|                 1|
|ASN00009770|                 1|
|UY000001572|                 1|
|ASN00009781|                 1|
|VE000001869|                 1|
|ASN00009834|                 1|
|VE000009060|                 1|
|ASN00010184|                 1|
|WA004912120|                 1|
|ASN00010742|                 1|
|WA007444590|                 1|
|ASN00010748|                 1|
|ASN00023107|                 1|
|ASN00033126|                 1|
|ASN00023304|                 1|
+-----------+------------------+
only showing top 20 rows

125975


In [34]:
# find the stations with only one unique element recorded
elementCount1 = number_unique_elements.filter(F.col("no_unique_elements") == '1')
print(elementCount1.count())

16382


In [35]:
# create set of the unique elements collected per station
list_unique_elements = inventory.groupBy("station_id").agg(F.collect_set(F.col("element")).alias("unique_elements"))

show_as_html(list_unique_elements, 10)

Unnamed: 0,station_id,unique_elements
0,AEM00041217,"[TMAX, TMIN, PRCP, TAVG]"
1,AGE00147708,"[TMAX, TMIN, PRCP, SNWD, TAVG]"
2,AGE00147710,"[TMAX, TMIN, PRCP, TAVG]"
3,AGE00147714,"[TMAX, TMIN, PRCP]"
4,AGE00147719,"[TMAX, TMIN, PRCP, TAVG]"
5,AGM00060360,"[TMAX, TMIN, PRCP, TAVG]"
6,AGM00060445,"[TMAX, TMIN, PRCP, SNWD, TAVG]"
7,AGM00060452,"[TMAX, TMIN, PRCP, TAVG]"
8,AGM00060511,"[TMAX, TMIN, PRCP, SNWD, TAVG]"
9,AGM00060540,"[TMAX, TMIN, PRCP, SNWD, TAVG]"


In [36]:
# count core elements vs other elements

# create list of the core elements
core_elements = ['PRCP', 'SNOW', 'SNWD', 'TMAX', 'TMIN']

# determine, per station, if the collected element is in the core elements list. Count the distinct number of elements collected 
# for each station in the core_elements list, versus not. 
core_vs_other = inventory.groupBy("station_id").agg(
    F.countDistinct(F.when(F.col("element").isin(core_elements), F.col("element"))).alias("core_element_count"),
    F.countDistinct(F.when(~F.col("element").isin(core_elements), F.col("element"))).alias("other_element_count")
)

core_vs_other.show()

+-----------+------------------+-------------------+
| station_id|core_element_count|other_element_count|
+-----------+------------------+-------------------+
|USC00030664|                 5|                 10|
|USC00030828|                 5|                 10|
|USC00030842|                 3|                 11|
|USC00031442|                 5|                 15|
|USC00031582|                 3|                 10|
|USC00031829|                 3|                 10|
|USC00031900|                 5|                  7|
|USC00031956|                 2|                  0|
|USC00031962|                 5|                  9|
|USC00031982|                 5|                 15|
|USC00032204|                 3|                 12|
|USC00032315|                 5|                  8|
|USC00032442|                 5|                 14|
|USC00032670|                 3|                 14|
|USC00032978|                 5|                  9|
|USC00033229|                 3|              

In [37]:
# filter for stations where 'core_count' is 5
all_core_stations = core_vs_other.filter(F.col("core_element_count") == 5)

all_core_stations.count()

20467

In [38]:
# filter for stations that collected 'PRCP' only
prcp_stations = inventory.filter(F.col('element') == 'PRCP').select("station_id").distinct()

# filter for stations who collected only one core element and did not collect any non-core elements
one_core_only_stations = core_vs_other.filter((F.col("core_element_count") == 1) & (F.col("other_element_count") == 0))

# join the two tables above to determine the stations who collected on PRCP
prcp_only_stations = prcp_stations.join(one_core_only_stations.select(
            F.col("station_id"),
            F.col("core_element_count").alias("PRCP_only")
        ), on="station_id", how="inner")

prcp_only_stations.show()
prcp_only_stations.count()

+-----------+---------+
| station_id|PRCP_only|
+-----------+---------+
|AJ000037679|        1|
|AJ000037831|        1|
|AJ000037912|        1|
|AJ000037981|        1|
|AM000037683|        1|
|AM000037698|        1|
|AQC00914141|        1|
|AR000000002|        1|
|ASN00001003|        1|
|ASN00002004|        1|
|ASN00002033|        1|
|ASN00002034|        1|
|ASN00002055|        1|
|ASN00003046|        1|
|ASN00003047|        1|
|ASN00003064|        1|
|ASN00003065|        1|
|ASN00003081|        1|
|ASN00004087|        1|
|ASN00006014|        1|
+-----------+---------+
only showing top 20 rows



16301

In [39]:
# join the information found above to the stations table to create enhanced table
stations = stations.join(inventory_years, "station_id", "left")

stations = stations.join(number_unique_elements, "station_id", "left")

stations = stations.join(list_unique_elements, "station_id", "left")

stations = stations.join(core_vs_other, "station_id", "left")

stations = stations.join(prcp_only_stations, "station_id", "left")

show_as_html(stations, 10)

Unnamed: 0,station_id,state_code,country_code,latitude,longitude,elevation,name,gsn_flag,hcn_crn_flag,wmo_id,country_name,state_name,first_year_active,last_year_active,no_unique_elements,unique_elements,core_element_count,other_element_count,PRCP_only
0,AEM00041217,,AE,24.433001,54.651001,26.799999,ABU DHABI INTL,,,41217.0,United Arab Emirates,,1983,2024,4,"[TMAX, TMIN, PRCP, TAVG]",3,1,
1,AGE00147708,,AG,36.720001,4.05,222.0,TIZI OUZOU,,,60395.0,Algeria,,1879,2024,5,"[TMAX, TMIN, PRCP, SNWD, TAVG]",4,1,
2,AGE00147710,,AG,36.75,5.1,9.0,BEJAIA-BOUGIE (PORT),,,60401.0,Algeria,,1909,2009,4,"[TMAX, TMIN, PRCP, TAVG]",3,1,
3,AGE00147714,,AG,35.77,0.8,78.0,ORAN-CAP FALCON,,,,Algeria,,1896,1938,3,"[TMAX, TMIN, PRCP]",3,0,
4,AGE00147719,,AG,33.799702,2.89,767.0,LAGHOUAT,,,60545.0,Algeria,,1888,2024,4,"[TMAX, TMIN, PRCP, TAVG]",3,1,
5,AGM00060360,,AG,36.821999,7.809,4.9,ANNABA,,,60360.0,Algeria,,1945,2024,4,"[TMAX, TMIN, PRCP, TAVG]",3,1,
6,AGM00060445,,AG,36.178001,5.324,1050.0,SETIF AIN ARNAT,,,60445.0,Algeria,,1957,2024,5,"[TMAX, TMIN, PRCP, SNWD, TAVG]",4,1,
7,AGM00060452,,AG,35.817001,-0.267,4.0,ARZEW,,,60452.0,Algeria,,1985,2024,4,"[TMAX, TMIN, PRCP, TAVG]",3,1,
8,AGM00060511,,AG,35.341,1.463,989.099976,BOU CHEKIF,,,60511.0,Algeria,,1983,2024,5,"[TMAX, TMIN, PRCP, SNWD, TAVG]",4,1,
9,AGM00060540,,AG,34.150002,0.067,1001.0,EL-KHEITER,,,60540.0,Algeria,,1981,2022,5,"[TMAX, TMIN, PRCP, SNWD, TAVG]",4,1,


In [40]:
# unable to save column of lists in csv (and had issues with parquet in next notebook so abandoned that), 
# so need to convert list to string with , as the delimiter
stations = stations.withColumn('unique_elements', F.concat_ws(',', F.col('unique_elements')))

In [None]:
# commented out so I don't rewrite when running notebook later

#save stations to outputs
"""
data_path = f"hdfs:///user/dcp31/assignment_1/stations1" 

(
    stations.write
    .option("compression", "gzip")
    .option("header", "true")
    .mode("overwrite")
    .csv(data_path)
)
"""

In [42]:
# join the daily subset to stations, keeping all daily observations if there is no match to stations
dataframe = daily.join(stations, "station_id", "left")

dataframe.show(10)

+-----------+--------+-------+-----+------+------+------+----+----------+------------+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+-----------------+----------------+------------------+--------------------+------------------+-------------------+---------+
| station_id|    date|element|value|m_flag|q_flag|s_flag|time|state_code|country_code|latitude|longitude|elevation|                name|gsn_flag|hcn_crn_flag|wmo_id|        country_name|state_name|first_year_active|last_year_active|no_unique_elements|     unique_elements|core_element_count|other_element_count|PRCP_only|
+-----------+--------+-------+-----+------+------+------+----+----------+------------+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+-----------------+----------------+------------------+--------------------+------------------+-------------------+---------+
|AEM00041217|20230101|   TMAX|  24

In [43]:
# determine if there are any stations in daily not present in stations

# find the distinct stations in daily
distinct_stations_daily = daily.select("station_id").distinct()

# join the distinct daily stations to stations using a left anti join to keep the rows of daily that do not match rows in 
# stations (i.e. no matching stations_id)
missing_stations = distinct_stations_daily.join(stations.select(F.col("station_id")), "station_id", "leftanti")

missing_stations.show()
print(missing_stations.count())

+-----------+
| station_id|
+-----------+
|AGM00060640|
|AGM00060670|
|AGM00060656|
+-----------+

3


In [None]:
# save to outputs 

'''
data_path1 = f"hdfs:///user/dcp31/assignment_1/countries1"
data_path2 = f"hdfs:///user/dcp31/assignment_1/states1"
data_path3 = f"hdfs:///user/dcp31/assignment_1/inventory1"
data_path4 = f"hdfs:///user/dcp31/assignment_1/daily1"

(
    countries.write
    .option("compression", "gzip")
    .option("header", "true")
    .mode("overwrite") 
    .csv(data_path1)
)

(
    states.write
    .option("compression", "gzip")
    .option("header", "true")
    .mode("overwrite")
    .csv(data_path2)
)

(
    inventory.write
    .option("compression", "gzip")
    .option("header", "true")
    .mode("overwrite")
    .csv(data_path3)
)

(
    daily.write
    .option("compression", "gzip")
    .option("header", "true")
    .mode("overwrite")
    .csv(data_path4)
)

'''

In [16]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()