# Working with NOAA dataset

The journal article describing GHCN-Daily is:

Menne, M.J., I. Durre, R.S. Vose, B.E. Gleason, and T.G. Houston, 2012: An overview of the Global Historical Climatology Network-Daily Database. Journal of Atmospheric and Oceanic Technology, 29, 897-910, doi:10.1175/JTECH-D-11-00103.1

In [None]:
#!pip install pyspark

In [1]:
# For multiple output per cell
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

In [2]:
DATASET_FOLDER = "/media/data-nvme/dev/datasets/WorldBank/"
noaa_csv_path = DATASET_FOLDER + "noaa/ASN*.csv"
SPARK_MASTER = "spark://192.168.0.9:7077"

In [3]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
import shutil

In [8]:
# import os

# os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
# os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"

# Line count

In [9]:
# noaa_csv_path = '/media/data-nvme/dev/datasets/WorldBank//noaa/ASN00060066.csv'
# noaa_csv_path = DATASET_FOLDER + '/noaa/ASN*.csv'
noaa_csv_path = DATASET_FOLDER + "/france/*.csv"
# noaa_csv_path = DATASET_FOLDER + '/small_dataset/*.csv'
def count_lines():
    # configuration
    APP_NAME = "count NOAA all lines"
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster(SPARK_MASTER)
    sc = SparkContext(conf=conf)
    # core part of the script
    files = sc.textFile(noaa_csv_path)
    total = files.count()
    #     lineLength = lines.map(lambda s: len(s))
    #     lineLength.persist()
    #     totalLength = lineLength.reduce(lambda a,b:a+b)
    #     # output results
    #     print(totalLength)
    sc.stop()
    return total


total = count_lines()
# print(f'{totalLength:%.2f}')
total

624444

In [10]:
print(f"{total:,d}")

624,444


Number of lines in the full dataset : 1 076 166 433

# Filters

https://docs.opendata.aws/noaa-ghcn-pds/readme.html

https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt

Keep only 

ELEMENT Summary

The five core elements are:

    PRCP = Precipitation (tenths of mm)
    SNOW = Snowfall (mm)
    SNWD = Snow depth (mm)
    TMAX = Maximum temperature (tenths of degrees C)
    TMIN = Minimum temperature (tenths of degrees C)

    Variable   Columns   Type
    ------------------------------
    ID            1-11   Character
    YEAR         12-15   Integer
    MONTH        16-17   Integer
    ELEMENT      18-21   Character
    VALUE1       22-26   Integer
    MFLAG1       27-27   Character
    QFLAG1       28-28   Character
		   
VALUE1     is the value on the first day of the month (missing = -9999).

      "PRCP_ATTRIBUTES" = a,M,Q,S where:
           a = DaysMissing (Numeric value): The number of days (from 1 to 5) missing or flagged is provided   
           M = GHCN-Daily Dataset Measurement Flag (see Section 1.3.a.ii for more details) 
           Q = GHCN-Daily Dataset Quality Flag (see Section 1.3.a.iii for more details)
           S = GHCN-Daily Dataset Source Code (see Section 1.3.a.iv for more details)  

MFLAG1     is the measurement flag for the first day of the month.  There are
           ten possible values:

           Blank = no measurement information applicable
           B     = precipitation total formed from two 12-hour totals
           D     = precipitation total formed from four six-hour totals
	   H     = represents highest or lowest hourly temperature (TMAX or TMIN) 
	           or the average of hourly values (TAVG)
	   K     = converted from knots 
	   L     = temperature appears to be lagged with respect to reported
	           hour of observation 
           O     = converted from oktas 
	   P     = identified as "missing presumed zero" in DSI 3200 and 3206
           T     = trace of precipitation, snowfall, or snow depth
	   W     = converted from 16-point WBAN code (for wind direction)

QFLAG1     is the quality flag for the first day of the month.  There are 
           fourteen possible values:

           Blank = did not fail any quality assurance check
           D     = failed duplicate check
           G     = failed gap check
           I     = failed internal consistency check
           K     = failed streak/frequent-value check
	   L     = failed check on length of multiday period 
           M     = failed megaconsistency check
           N     = failed naught check
           O     = failed climatological outlier check
           R     = failed lagged range check
           S     = failed spatial consistency check
           T     = failed temporal consistency check
           W     = temperature too warm for snow
           X     = failed bounds check
	   Z     = flagged as a result of an official Datzilla 
	           investigation
    
- WESF = Water equivalent of snowfall (tenths of mm)

WV** = Weather in the Vicinity where ** has one of the following values:

    01 = Fog, ice fog, or freezing fog (may include heavy fog)
    03 = Thunder
    07 = Ash, dust, sand, or other blowing obstruction
    18 = Snow or ice crystals
    20 = Rain or snow shower

WMO ID is the World Meteorological Organization (WMO) number for the station. If the station has no WMO number (or one has not yet been matched to this station), then the field is blank.

     

HCN/CRN FLAG = flag that indicates whether the station is part of the U.S. Historical Climatology Network (HCN). There are three possible values:

    Blank = Not a member of the U.S. Historical Climatology or U.S. Climate Reference Networks
    HCN = U.S. Historical Climatology Network station
    CRN = U.S. Climate Reference Network or U.S. Regional Climate Network Station


In [46]:
!head /media/data-nvme/dev/datasets/WorldBank//noaa/ASN00060066.csv'

/bin/bash: -c: ligne 0: fin de fichier (EOF) prématurée lors de la recherche du « ' » correspondant
/bin/bash: -c: ligne 1: erreur de syntaxe : fin de fichier prématurée


In [None]:
!head $DATASET_FOLDER/noaa/ASN00060066.csv
!wc -l $DATASET_FOLDER/noaa/ASN00060066.csv
!grep 2016 $DATASET_FOLDER/noaa/AE000041196.csv | head -1

In [12]:
!head -3 /media/data-nvme/dev/datasets/WorldBank/france/FRM00007180.csv

"STATION","DATE","LATITUDE","LONGITUDE","ELEVATION","NAME","PRCP","PRCP_ATTRIBUTES","SNWD","SNWD_ATTRIBUTES","TMAX","TMAX_ATTRIBUTES","TMIN","TMIN_ATTRIBUTES","TAVG","TAVG_ATTRIBUTES"
"FRM00007180","1944-11-03","48.692","6.23","228.9","ESSEY, FR",,,,,,,,,"   49","H,,S"
"FRM00007180","1944-11-04","48.692","6.23","228.9","ESSEY, FR",,,,,,,,,"   58","H,,S"


In [51]:
!rm -r /media/data-nvme/dev/datasets/WorldBank/year_2016-01

In [15]:
s = '"FRM00007180","1944-11-03","48.692","6.23","228.9","ESSEY, FR",,,,,,,,,"   49","H,,S"'
s[15:20]

'1944-'

# Filter on Rain - Join version

113 074 files with rain data, no need to filter before other operation.

In [None]:
%%time
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import shutil

DATASET_FOLDER = "/media/data-nvme/dev/datasets/WorldBank/"
SPARK_MASTER = "spark://192.168.0.9:7077"
APP_NAME = "Group daily rain by Country"

noaa_csv_path = DATASET_FOLDER + "/noaa/*.csv"
noaa_csv_path = DATASET_FOLDER + "noaa/ASN*.csv"
output = DATASET_FOLDER + "daily_rain_by_country"

# Create Spark session
spark = SparkSession.builder.master(SPARK_MASTER).appName(APP_NAME).getOrCreate()
# Convert list to data frame
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("multiLine", True)
    .load(noaa_csv_path)
)
df_rain = df[
    [
        "STATION",
        "DATE",
        "LATITUDE",
        "LONGITUDE",
        "ELEVATION",
        "NAME",
        "PRCP",
        "PRCP_ATTRIBUTES",
    ]
]

# Extract country code
df_rain = df_rain.withColumn(
    "COUNTRY", F.col("NAME").substr(F.length("NAME") - 1, F.length("NAME"))
)

# Register the DataFrame as a SQL temporary view
df_rain.createOrReplaceTempView("noaa")
sqlDF = spark.sql(
    "SELECT DATE, COUNTRY, ceil(100 * avg(PRCP))/100 as avg_PRCP, ceil(100 * sum(PRCP))/100 as sum_PRCP, max(PRCP) as max_PRCP, ceil(100 * stddev(PRCP))/100 as stddev_PRCP, count(PRCP) as count_PRCP FROM noaa GROUP BY DATE, COUNTRY;"
)

shutil.rmtree(output, ignore_errors=True)
# sqlDF.repartition(1).write.csv(output)
sqlDF.write.csv(output)
print(sqlDF.columns)

In [9]:
!rm -r /media/data-nvme/dev/datasets/WorldBank/year_2016-01_rain

In [27]:
%%time
# noaa_csv_path = DATASET_FOLDER + '/noaa/*.csv'
# noaa_csv_path = DATASET_FOLDER + 'year_2016-fra'
noaa_csv_path = DATASET_FOLDER + "/france/*.csv"
inventory_path = DATASET_FOLDER + "ghcnd-inventory.txt"
output = DATASET_FOLDER + "france_rain"
# sc.stop()
total = 0
# configuration
APP_NAME = "Filter on Rain"

# Create Spark session
spark = SparkSession.builder.master(SPARK_MASTER).appName(APP_NAME).getOrCreate()
# Convert list to data frame
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("multiLine", True)
    .load(noaa_csv_path)
)

Record count is: 624408
CPU times: user 2.53 ms, sys: 3.69 ms, total: 6.21 ms
Wall time: 5.84 s


In [34]:
'","'.join(df.columns)

'STATION","DATE","LATITUDE","LONGITUDE","ELEVATION","NAME","PRCP","PRCP_ATTRIBUTES","SNWD","SNWD_ATTRIBUTES","TMAX","TMAX_ATTRIBUTES","TMIN","TMIN_ATTRIBUTES","TAVG","TAVG_ATTRIBUTES'

In [41]:
df_rain = df[
    [
        "STATION",
        "DATE",
        "LATITUDE",
        "LONGITUDE",
        "ELEVATION",
        "NAME",
        "PRCP",
        "PRCP_ATTRIBUTES",
    ]
]

['STATION',
 'DATE',
 'LATITUDE',
 'LONGITUDE',
 'ELEVATION',
 'NAME',
 'PRCP',
 'PRCP_ATTRIBUTES']

In [42]:
# df_rain["COUNTRY"] = df_rain["NAME"][:-2]
# from pyspark.sql.functions import col
import pyspark.sql.functions as F

# Extract country code
df_rain = df_rain.withColumn(
    "COUNTRY", F.col("NAME").substr(F.length("NAME") - 1, F.length("NAME"))
)
df_rain.head(3)

[Row(STATION='FRM00007207', DATE='1930-04-01', LATITUDE='47.2942', LONGITUDE='-3.2181', ELEVATION='34.0', NAME='BELLE ILE LE TALUT, FR', PRCP='   20', PRCP_ATTRIBUTES=',,E', COUNTRY='FR'),
 Row(STATION='FRM00007207', DATE='1930-04-02', LATITUDE='47.2942', LONGITUDE='-3.2181', ELEVATION='34.0', NAME='BELLE ILE LE TALUT, FR', PRCP='  122', PRCP_ATTRIBUTES=',,E', COUNTRY='FR'),
 Row(STATION='FRM00007207', DATE='1930-04-03', LATITUDE='47.2942', LONGITUDE='-3.2181', ELEVATION='34.0', NAME='BELLE ILE LE TALUT, FR', PRCP='   93', PRCP_ATTRIBUTES=',,E', COUNTRY='FR')]

In [48]:
output = DATASET_FOLDER + "france_rain"
df_rain.repartition(1).write.csv(output)
# df_rain.write.csv(output)
df_rain.columns

['STATION',
 'DATE',
 'LATITUDE',
 'LONGITUDE',
 'ELEVATION',
 'NAME',
 'PRCP',
 'PRCP_ATTRIBUTES',
 'COUNTRY']

In [4]:
import pandas as pd

# df_out = pd.read_csv('/media/data-nvme/dev/datasets/WorldBank/daily_rain_by_country.csv.gz')
df_out = pd.read_csv(
    "/media/data-nvme/dev/src/batch8_worldbank/datasets/daily_rain_by_country.csv.gz"
)
df_out.head(3)

Unnamed: 0,date,country_ISO2,avg_rain,sum_rain,max_rain,stddev_rain,station_count
0,1782-01-04,GM,4.0,4.0,4.0,0.0,1
1,1782-02-07,GM,13.0,13.0,13.0,0.0,1
2,1782-08-13,GM,29.0,29.0,29.0,0.0,1


In [5]:
df_out.tail(10)

Unnamed: 0,date,country_ISO2,avg_rain,sum_rain,max_rain,stddev_rain,station_count
5215939,2020-10-14,VQ,64.0,64.0,64.0,0.0,1
5215940,2020-10-16,CQ,28.0,56.0,56.0,39.6,2
5215941,2020-10-16,HO,0.0,0.0,0.0,0.0,2
5215942,2020-10-19,CH,13.23,2659.0,442.0,50.87,201
5215943,2020-10-21,RS,11.13,4993.0,201.0,22.38,449
5215944,2020-10-27,BC,0.0,0.0,0.0,0.0,3
5215945,2020-10-29,CF,136.17,817.0,470.0,167.91,6
5215946,2020-10-29,MY,25.6,384.0,216.0,57.43,15
5215947,2020-10-30,NL,67.8,339.0,99.0,29.26,5
5215948,2020-10-30,RO,13.82,152.0,84.0,24.68,11


In [105]:
df_out.columns
df_out.columns = [
    "date",
    "country_ISO2",
    "avg_rain",
    "sum_rain",
    "max_rain",
    "stddev_rain",
    "station_count",
]
df_out.columns

Index(['date', 'country_ISO2', 'avg_rain', 'sum_rain', 'max_rain',
       'stddev_rain', 'station_count'],
      dtype='object')

Index(['date', 'country_ISO2', 'avg_rain', 'sum_rain', 'max_rain',
       'stddev_rain', 'station_count'],
      dtype='object')

In [106]:
df_out = df_out.dropna()
df_out.to_csv(
    "/media/data-nvme/dev/src/batch8_worldbank/datasets/daily_rain_by_country.csv.gz",
    compression="gzip",
    index=False,
)

In [68]:
import pandas as pd

df_out = pd.read_csv("/media/data-nvme/dev/datasets/WorldBank/france-rain.csv")
df_out.head(3)

Unnamed: 0,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,NAME,PRCP,PRCP_ATTRIBUTES,COUNTRY
0,FRM00007207,1930-04-01,47.2942,-3.2181,34.0,"BELLE ILE LE TALUT, FR",20.0,",,E",FR
1,FRM00007207,1930-04-02,47.2942,-3.2181,34.0,"BELLE ILE LE TALUT, FR",122.0,",,E",FR
2,FRM00007207,1930-04-03,47.2942,-3.2181,34.0,"BELLE ILE LE TALUT, FR",93.0,",,E",FR


In [76]:
df_test = df_out.query('DATE == "2016-04-01"')
df_test.describe()

Unnamed: 0,LATITUDE,LONGITUDE,ELEVATION,PRCP
count,36.0,36.0,36.0,36.0
mean,43.828025,4.259108,197.538889,43.972222
std,16.152703,11.882491,222.977693,59.728666
min,-49.35,-5.05,3.7,0.0
25%,44.06575,0.08625,55.475,0.0
50%,46.869,2.7006,113.55,12.5
75%,48.508,5.3015,262.425,78.0
max,50.562,70.25,876.0,201.0


In [86]:
# Register the DataFrame as a SQL temporary view
df_rain.createOrReplaceTempView("noaa")
sqlDF = spark.sql(
    "SELECT DATE, COUNTRY, ceil(100 * avg(PRCP))/100 as avg_PRCP, ceil(100 * sum(PRCP))/100 as sum_PRCP, max(PRCP) as max_PRCP, ceil(100 * stddev(PRCP))/100 as stddev_PRCP, count(PRCP) as count_PRCP FROM noaa GROUP BY DATE, COUNTRY;"
)

In [87]:
sqlDF.head(3)

[Row(DATE='1922-04-17', COUNTRY='FR', avg_PRCP=24.0, sum_PRCP=24.0, max_PRCP='   24', stddev_PRCP=0.0, count_PRCP=1),
 Row(DATE='1923-01-13', COUNTRY='FR', avg_PRCP=0.0, sum_PRCP=0.0, max_PRCP='    0', stddev_PRCP=0.0, count_PRCP=1),
 Row(DATE='1923-03-20', COUNTRY='FR', avg_PRCP=0.0, sum_PRCP=0.0, max_PRCP='    0', stddev_PRCP=0.0, count_PRCP=1)]

In [88]:
output = DATASET_FOLDER + "france_rain_avg_by_day"
shutil.rmtree(output)
sqlDF.repartition(1).write.csv(output)
# df_rain.write.csv(output)
sqlDF.columns

['DATE',
 'COUNTRY',
 'avg_PRCP',
 'sum_PRCP',
 'max_PRCP',
 'stddev_PRCP',
 'count_PRCP']

In [89]:
",".join(sqlDF.columns)

'DATE,COUNTRY,avg_PRCP,sum_PRCP,max_PRCP,stddev_PRCP,count_PRCP'

In [None]:
sc = SparkContext(conf=conf)


full_df = sc.read.option("header", True).option("inferSchema", True).csv(noaa_csv_path)

# Load inventory
inventory_rdd = sc.textFile(inventory_path)
# Filter on Rain
rain_inventory_rdd = inventory_rdd.filter(lambda s: "PRCP" in s)
# Now we have a list of all files containings precipitation data
# Keep only the first column
# Format each RDD as (K, V) to prepare for the join operation
rain_inventory_rdd = rain_inventory_rdd.map(
    lambda line: (line[0:11], line[36:])
)  # Keep code and years
# rain_inventory_rdd = rain_inventory_rdd.filter(lambda s: s[0:11])
# rain_inventory = rain_inventory_rdd.collect()
# Load the stations data points
print(f"Processing {noaa_csv_path}...")
all_data_rdd = sc.textFile(noaa_csv_path + "/*")
all_data_rdd = all_data_rdd.map(lambda line: (line[1:12], line[14:]))
# Keep only precipitation data
join = rain_inventory_rdd.join(all_data_rdd)
# rain_rdd  = files_rdd.filter(lambda s : s[1:12] in rain_inventory)
# Save precipitation data

print(f"Saving to {output}")
join_output = join.map(lambda x: ",".join([x[0], x[1][1]]))
# sc.parallelize(join_output.take(2)).collect()
# Flatten the result
# join = rdd.map(lambda x: ','.join([x[0],x[1][0],x[1][1]]))
# Get all partition on one node, to have one file (don't do it for huge dataset)
join_output = join_output.repartition(1)

shutil.rmtree(output, ignore_errors=True)
join_output.saveAsTextFile(output)
total = join.count()
sc.stop()

total

# Filter on Year

In [16]:
%%time
# noaa_csv_path = '/media/data-nvme/dev/datasets/WorldBank//noaa/ASN00060066.csv'
# noaa_csv_path = DATASET_FOLDER + '/noaa/AE*.csv'
# noaa_csv_path = DATASET_FOLDER + '/noaa/*.csv'
# noaa_csv_path = DATASET_FOLDER + '/small_dataset/*.csv'
noaa_csv_path = DATASET_FOLDER + "/france/*.csv"


def filter_year():
    total = 0
    # configuration
    APP_NAME = "Filter on Year"
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster(SPARK_MASTER)
    sc = SparkContext(conf=conf)
    try:
        # core part of the script
        print(f"Processing {noaa_csv_path}...")
        files_rdd = sc.textFile(noaa_csv_path)
        year_2016 = files_rdd.filter(lambda s: s[15:20] == "2016-" in s)
        output = DATASET_FOLDER + "year_2016-fra"
        print(f"Saving to {output}")
        year_2016.saveAsTextFile(output)
        total = year_2016.count()
    except Exception as inst:
        print("ERROR")
        print(type(inst))  # the exception instance
        print(inst.args)  # arguments stored in .args
        print(inst)
        raise
    finally:
        sc.stop()
        return total


total = filter_year()
# print(f'{totalLength:%.2f}')
total

Processing /media/data-nvme/dev/datasets/WorldBank//france/*.csv...
Saving to /media/data-nvme/dev/datasets/WorldBank/year_2016-fra
CPU times: user 28.6 ms, sys: 5.06 ms, total: 33.7 ms
Wall time: 5.43 s


13105

In [None]:
s = '"USR0000AFRA","2016-01-25","35.8456","-113.055","2063.5","FRAZIER WELLS ARIZONA, AZ US","   39","H,,U","  -39","H,,U","  -10",",,U"'
s[1:12]
s[14:]
s = "ACW00011604  17.1167  -61.7833 TMAX 1949 1949"
s[0:11]
s[36:]

https://stackoverflow.com/questions/56957589/how-to-read-multiple-csv-files-with-different-schema-in-pyspark

In [87]:
!echo '"STATION","DATE","LATITUDE","LONGITUDE","ELEVATION","NAME","PRCP","PRCP_ATTRIBUTES","SNWD","SNWD_ATTRIBUTES","TMAX","TMAX_ATTRIBUTES","TMIN","TMIN_ATTRIBUTES","TAVG","TAVG_ATTRIBUTES"'
!head $DATASET_FOLDER/year_2016-01_rain/part-00000

"STATION","DATE","LATITUDE","LONGITUDE","ELEVATION","NAME","PRCP","PRCP_ATTRIBUTES","SNWD","SNWD_ATTRIBUTES","TMAX","TMAX_ATTRIBUTES","TMIN","TMIN_ATTRIBUTES","TAVG","TAVG_ATTRIBUTES"
FRM00007005,"2016-01-01","50.143","1.832","67.1","ABBEVILLE, FR","    5",",,S",,,"   88",",,S","   21",",,S","   56","H,,S"
FRM00007180,"2016-01-01","48.692","6.23","228.9","ESSEY, FR","   15",",,S",,,"   60",",,S","    9",",,S","   44","H,,S"
FRM00007535,"2016-01-01","44.745","1.3967","260.0","GOURDON, FR","   12",",,E",,,"  137",",,E","    9",",,E","   84","H,,S"
FRM00007607,"2016-01-01","43.912","-0.508","61.9","MONT DE MARSAN, FR","    5",",,S",,,,,"    9",",,S","   77","H,,S"
FRM00007240,"2016-01-01","47.432","0.728","108.8","VAL DE LOIRE, FR","    0",",,S",,,"   96",",,S",,,"   68","H,,S"
FRM00007790,"2016-01-01","42.553","9.484","7.9","PORETTA, FR","    0",",,S",,,"  133",",,S",,,"  107","H,,S"
FRM00007558,"2016-01-01","44.117","3.017","720.0","MILLAU, FR","   48",",,S",,,,,"   37",",,S","   66","H

In [74]:
sc.parallelize(join.take(2)).collect()

[('FRM00007005',
  ('1973 2020',
   '"2016-01-01","50.143","1.832","67.1","ABBEVILLE, FR","    5",",,S",,,"   88",",,S","   21",",,S","   56","H,,S"')),
 ('FRM00007180',
  ('1973 2020',
   '"2016-01-01","48.692","6.23","228.9","ESSEY, FR","   15",",,S",,,"   60",",,S","    9",",,S","   44","H,,S"'))]

['FRM00007005,"2016-01-01","50.143","1.832","67.1","ABBEVILLE, FR","    5",",,S",,,"   88",",,S","   21",",,S","   56","H,,S"',
 'FRM00007180,"2016-01-01","48.692","6.23","228.9","ESSEY, FR","   15",",,S",,,"   60",",,S","    9",",,S","   44","H,,S"']

36

"ORLY" ID : 0-20000-0-07149

Coordinates: 48.7166666667°N, 2.3844444444°E, 89m 

In [85]:
# !cd $DATASET_FOLDER/year_2016-01_rain && (ls | xargs cat)  > ../year_2016-01_rain.csv
# ! head $DATASET_FOLDER/year_2016-01_rain.csv

In [None]:
# noaa_csv_path = '/media/data-nvme/dev/datasets/WorldBank//noaa/ASN00060066.csv'
# noaa_csv_path = DATASET_FOLDER + '/noaa/AE*.csv'
# noaa_csv_path = DATASET_FOLDER + '/noaa/*.csv'
noaa_csv_path = DATASET_FOLDER + "year_2016-01.csv"


def extract_rain():
    total = 0
    # configuration
    APP_NAME = "Count Rain"
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster(SPARK_MASTER)
    sc = SparkContext(conf=conf)
    try:
        # core part of the script
        print(f"Processing {noaa_csv_path}...")
        files_rdd = sc.textFile(noaa_csv_path + "/*")
        total = files_rdd.count()
    except Exception as inst:
        print("ERROR")
        print(type(inst))  # the exception instance
        print(inst.args)  # arguments stored in .args
        print(inst)
        raise
    finally:
        sc.stop()
        return total


total = extract_rain()
# print(f'{totalLength:%.2f}')
total

In [None]:
!head $DATASET_FOLDER/year_2016.csv

In [None]:
from  pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc., 
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])



df = spark.read.format("csv") \
   .option("header", "false") \
   .option("sep","|") \
   .schema(customSchema) \
   .load(fullPath) \
   .withColumn("filename", input_file_name())

# TEST

In [5]:
conf = SparkConf().setAppName("test")
conf = conf.setMaster(SPARK_MASTER)
sc = SparkContext(conf=conf)
s = [
    '"USR0000AFRA","2016-01-25","35.8456","-113.055","2063.5","FRAZIER WELLS ARIZONA, AZ US","   39","H,,U","  -39","H,,U","  -10",",,U"'
]
s += [
    '"FRM00007149","2016-01-01","48.7167","2.3842","89.0","ORLY, FR","   10",",,E",,,"   85",",,E","   38",",,E","   62","H,,S"'
]
s

['"USR0000AFRA","2016-01-25","35.8456","-113.055","2063.5","FRAZIER WELLS ARIZONA, AZ US","   39","H,,U","  -39","H,,U","  -10",",,U"',
 '"FRM00007149","2016-01-01","48.7167","2.3842","89.0","ORLY, FR","   10",",,E",,,"   85",",,E","   38",",,E","   62","H,,S"']

In [33]:
rdd = sc.parallelize(s)
# test = rain_inventor_rdd.map(lambda line : (line[0:11], line[36:])) # Keep code and years
rdd = rdd.map(lambda line: (line[1:11], (line[15:25], line[28:35])))
rdd.collect()

[('USR0000AFR', ('2016-01-25', '35.8456')),
 ('FRM0000714', ('2016-01-01', '48.7167'))]

In [34]:
rdd.repartition(1).collect()

[('USR0000AFR', ('2016-01-25', '35.8456')),
 ('FRM0000714', ('2016-01-01', '48.7167'))]

In [40]:
rdd.map(lambda x: ",".join([x[0], x[1][0], x[1][1]])).repartition(1).collect()

['USR0000AFR,2016-01-25,35.8456', 'FRM0000714,2016-01-01,48.7167']

In [41]:
sc.stop()