In [4]:
from pprint import pprint
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://ben-spark-master:7077") \
        .appName("read_weather_data")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

data_frame = spark_session.read\
    .option("header", "true")\
    .csv('/mnt/nfs/ben-spark-master/teaching/noaa/global-hourly/access/1950/*.csv')\
    .cache()

In [46]:
spark_context.uiWebUrl

http://ben-spark-master:4040


In [5]:
# Count the files:
#$ find -type f | wc -l
# 2546

# Count the lines in 1 file:
# $cat 10224099999.csv | wc -l
# 8756

# Count the CSV rows:
data_frame.count()

8671273

In [6]:
data_frame.show()

+-----------+-------------------+------+---------+---------+---------+-------------+-----------+---------+---------------+--------------+-----------+------------+-------+-------+-------+-----------+--------+------------------+----+----+--------------------+----+----+----+----------------+----+----+--------------------+
|    STATION|               DATE|SOURCE| LATITUDE|LONGITUDE|ELEVATION|         NAME|REPORT_TYPE|CALL_SIGN|QUALITY_CONTROL|           WND|        CIG|         VIS|    TMP|    DEW|    SLP|        AA1|     AY1|               GA1| GA2| GA3|                 GF1| IA1| KA1| KA2|             MD1| MW1| MW2|                 EQD|
+-----------+-------------------+------+---------+---------+---------+-------------+-----------+---------+---------------+--------------+-----------+------------+-------+-------+-------+-----------+--------+------------------+----+----+--------------------+----+----+----+----------------+----+----+--------------------+
|03135099999|1950-01-01T00:00:00|    

In [48]:
data_frame.printSchema()

root
 |-- STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS: string (nullable = true)
 |-- TMP: string (nullable = true)
 |-- DEW: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- AA1: string (nullable = true)
 |-- AY1: string (nullable = true)
 |-- GA1: string (nullable = true)
 |-- GA2: string (nullable = true)
 |-- GA3: string (nullable = true)
 |-- GF1: string (nullable = true)
 |-- IA1: string (nullable = true)
 |-- KA1: string (nullable = true)
 |-- KA2: string (nullable = true)
 |-- MD1: string (nullable = true)
 |-- MW1: string (nullable = true

In [49]:
# 'old' RDD API underneath...
data_frame.rdd.take(1)

[Row(STATION='03135099999', DATE='1950-01-01T00:00:00', SOURCE='4', LATITUDE='55.509444', LONGITUDE='-4.586667', ELEVATION='19.81', NAME='PRESTWICK, UK', REPORT_TYPE='FM-12', CALL_SIGN='99999', QUALITY_CONTROL='V020', WND='130,1,N,0026,1', CIG='22000,1,C,N', VIS='006000,1,N,9', TMP='+0033,1', DEW='+0028,1', SLP='10263,1', AA1=None, AY1='1,1,99,9', GA1=None, GA2=None, GA3=None, GF1='05,99,1,99,9,00,1,99999,9,03,1,00,1', IA1=None, KA1=None, KA2=None, MD1='2,1,006,1,+999,9', MW1='02,1', MW2=None, EQD='Q01+000002SCOTLC')]

In [50]:
data_frame.rdd.getNumPartitions()

39

In [51]:
# .distinct()
data_frame.select('NAME').distinct().take(10)

[Row(NAME='PAVELETS, RS'),
 Row(NAME='DZARDZAN, RS'),
 Row(NAME='UST MAJYA, RS'),
 Row(NAME='OKSOY FYR, NO'),
 Row(NAME='LIMNOS ISLAND AIRPORT, GR'),
 Row(NAME='DANMARKSHAVN PORT, GL'),
 Row(NAME='MUKHINO, RS'),
 Row(NAME='VALKENBURG, NL'),
 Row(NAME='UST PORT UST ENISEISK, RS'),
 Row(NAME='TURINSK, RS')]

In [52]:
# .count()
data_frame.select('NAME').distinct().count()

571

In [11]:
# .take() (like .show())
data_frame.filter('NAME == "PRESTWICK, UK"').select('WND').take(10)

[Row(WND='130,1,N,0026,1'),
 Row(WND='135,1,N,0026,1'),
 Row(WND='180,1,N,0036,1'),
 Row(WND='180,1,N,0036,1'),
 Row(WND='140,1,N,0021,1'),
 Row(WND='135,1,N,0021,1'),
 Row(WND='170,1,N,0026,1'),
 Row(WND='180,1,N,0026,1'),
 Row(WND='170,1,N,0026,1'),
 Row(WND='180,1,N,0026,1')]

In [29]:
# Convert a column

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def WND_to_WND_SPEED_MS(wnd):
  return float(wnd.split(',')[3])/10

udf_WND_to_WND_SPEED_MS = udf(WND_to_WND_SPEED_MS, StringType())


data_frame_with_wnd_speed = data_frame.withColumn("WND_SPEED_MS",udf_WND_to_WND_SPEED_MS("WND"))

# .filter()

# 9999: missing (with scale factor of 10)
data_frame_with_wnd_speed = data_frame_with_wnd_speed.filter(data_frame_with_wnd_speed['WND_SPEED_MS'] < 999)

#wnd_split = pyspark.sql.functions.split(data_frame['WND'], ',')
#data_frame_with_wnd_speed = data_frame.withColumn('WND_SPEED_MS', wnd_split.getItem(3))

data_frame_with_wnd_speed.select('WND', 'WND_SPEED_MS').show()

data_frame_with_wnd_speed.select('WND_SPEED_MS').summary().show()

+--------------+------------+
|           WND|WND_SPEED_MS|
+--------------+------------+
|130,1,N,0026,1|         2.6|
|135,1,N,0026,1|         2.6|
|180,1,N,0036,1|         3.6|
|180,1,N,0036,1|         3.6|
|140,1,N,0021,1|         2.1|
|135,1,N,0021,1|         2.1|
|170,1,N,0026,1|         2.6|
|180,1,N,0026,1|         2.6|
|170,1,N,0026,1|         2.6|
|180,1,N,0026,1|         2.6|
|180,1,N,0057,1|         5.7|
|180,1,N,0057,1|         5.7|
|190,1,N,0057,1|         5.7|
|180,1,N,0057,1|         5.7|
|200,1,N,0057,1|         5.7|
|203,1,N,0057,1|         5.7|
|200,1,N,0057,1|         5.7|
|203,1,N,0057,1|         5.7|
|200,1,N,0051,1|         5.1|
|203,1,N,0051,1|         5.1|
+--------------+------------+
only showing top 20 rows

+-------+------------------+
|summary|      WND_SPEED_MS|
+-------+------------------+
|  count|           8601428|
|   mean|  4.04703105111847|
| stddev|3.2041228109591486|
|    min|               0.0|
|    25%|               1.5|
|    50%|             

In [35]:
# .agg(), .orderby()  -- where is the avg most windy place?

most_windy = data_frame_with_wnd_speed.groupby('NAME')\
    .agg({'WND_SPEED_MS': 'mean'})\
    .orderBy('avg(WND_SPEED_MS)', ascending=False)\

# Rename a column
most_windy = most_windy.withColumnRenamed('avg(WND_SPEED_MS)','AVG_WND_SPEED_MS')
    
most_windy.show()

+--------------------+------------------+
|                NAME|  AVG_WND_SPEED_MS|
+--------------------+------------------+
|  VESTMANNAEYJAR, IC|            11.555|
| CAMPBELL ISLAND, NZ|10.924466571835005|
|    GRAN CANARIA, SP|10.658823529411762|
|       LANZAROTE, SP|10.207692307692309|
|AMCHITKA ISLAND, ...| 10.05585070611949|
|MOUNT LAGUNA CAA ...|  9.96358447488591|
|FELDBERG SCHWARZW...|   9.5155817174515|
|LUDERITZ DIAZ POI...| 9.252941176470593|
|PINE SPRINGS GUAD...| 8.779119561493236|
|   ILE AMSTERDAM, FS| 8.772727272727295|
|        WALES, AK US| 8.759103385178399|
|   MARION ISLAND, SF|  8.67556008146638|
|           MATUA, RS| 8.636529680365339|
|    ARGENTIA AUT, CA| 8.558078552616378|
|       CASPER, WY US|  8.35883822610875|
|           ASSAB, ER| 8.353956834532378|
|  BUHTA GAVRIILA, RS| 8.194366197183165|
|   CAPE CAMPBELL, NZ| 8.194150417827288|
|       MOGADISHU, SO| 8.189841269841274|
|       KANIN NOS, RS| 8.145466155810983|
+--------------------+------------

In [41]:
most_windy.limit(20).write.format('csv').save('/mnt/nfs/ben-spark-master/teaching/noaa/output.csv')

In [42]:
# output CSV as ~200 files.
most_windy.write.format('csv').save('/mnt/nfs/ben-spark-master/teaching/noaa/output-all.csv')