# Environment preparation

## Download generic purpose datasets

In [1]:
%%bash
wget -P data/ds-spark -q https://storage.googleapis.com/academy-data/ds-with-spark.zip
cd data/ds-spark; unzip ds-with-spark.zip

Archive:  ds-with-spark.zip
  inflating: ted_main.csv            
  inflating: __MACOSX/._ted_main.csv  
  inflating: ted_transcripts.csv     
  inflating: __MACOSX/._ted_transcripts.csv  
  inflating: used_cars.csv           
  inflating: __MACOSX/._used_cars.csv  
  inflating: aac_intakes_outcomes.csv  
  inflating: __MACOSX/._aac_intakes_outcomes.csv  
  inflating: aac_intakes.csv         
  inflating: __MACOSX/._aac_intakes.csv  
  inflating: aac_outcomes.csv        
  inflating: __MACOSX/._aac_outcomes.csv  
  inflating: airlines.parquet        
  inflating: __MACOSX/._airlines.parquet  
  inflating: band1.txt               
  inflating: band2.txt               
  inflating: chicagoAllWeather.csv   
  inflating: __MACOSX/._chicagoAllWeather.csv  
  inflating: chicagoCensus.csv       
  inflating: __MACOSX/._chicagoCensus.csv  
  inflating: chicagoCrimes10k.csv    
  inflating: __MACOSX/._chicagoCrimes10k.csv  
  inflating: complete-day.parquet    
  inflating: __MACOSX/._complete-

## Download weather dataset format

In [2]:
%%bash
wget -q https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/ghcn-daily-by_year-format.rtf

In [3]:
%%bash 
pip3 install striprtf



In [4]:
from striprtf.striprtf import rtf_to_text

with open('ghcn-daily-by_year-format.rtf', 'r') as file:
    data = file.read().replace('\n', '')

text = rtf_to_text(data) 
print(text)

The following information serves as a definition of each field in one line of data covering one station-day. Each field described below is separated by a comma ( , ) and follows the order
presented in this document.

ID = 11 character station identification code
YEAR/MONTH/DAY = 8 character date in YYYYMMDD format (e.g. 19860529 = May 29, 1986)
ELEMENT = 4 character indicator of element type 
DATA VALUE = 5 character data value for ELEMENT 
M-FLAG = 1 character Measurement Flag 
Q-FLAG = 1 character Quality Flag 
S-FLAG = 1 character Source Flag 
OBS-TIME = 4-character time of observation in hour-minute format (i.e. 0700 =7:00 am)

See section III of the GHCN-Daily readme.txt file for an explanation of ELEMENT codes and their units as well as the M-FLAG, Q-FLAGS and S-FLAGS.

The OBS-TIME field is populated with the observation times contained in NOAA/NCDC’s Multinetwork Metadata System (MMS).  




## Download and show data files

In [5]:
%%bash
START=2017
END=2018
for ((i=$START;i<=END;i++)); do 
    wget -P data/weather -q https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/$i.csv.gz
    echo "Finished $i."
done

Finished 2017.
Finished 2018.


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-1").\
        master("spark://spark-master:7077").\
        config("spark.executor.enabled", "true").\
        config("spark.eventLog.dir", "/opt/workspace/history").\
        getOrCreate()

In [7]:
data = spark.read.csv("data/weather")

In [8]:
data.where('_c5 is not null').show(10)

+-----------+--------+----+----+----+---+---+----+
|        _c0|     _c1| _c2| _c3| _c4|_c5|_c6| _c7|
+-----------+--------+----+----+----+---+---+----+
|ASN00061390|20170101|PRCP|   0|null|  L|  a|null|
|ASN00065036|20170101|DAPR|   2|null|  L|  a|null|
|ASN00065036|20170101|MDPR|  10|null|  L|  a|null|
|ASN00073141|20170101|TMAX| 223|null|  S|  a|null|
|CA001037090|20170101|WSFG|1190|null|  X|  C|null|
|CA001039035|20170101|SNWD|   0|null|  I|  C|null|
|CA00109E7R6|20170101|SNWD|   0|null|  I|  C|null|
|CA001106CL2|20170101|TMAX|   0|null|  N|  C|null|
|CA001106CL2|20170101|TMIN|   0|null|  N|  C|null|
|CA001106CL2|20170101|SNOW|   0|null|  I|  C|null|
+-----------+--------+----+----+----+---+---+----+
only showing top 10 rows



In [9]:
header = "ID, YYYYMMDD, ELEMENT, DATA_VALUE, M-FLAG, Q-FLAG, S-FLAG, OBS-TIME".split(', ')
header

['ID',
 'YYYYMMDD',
 'ELEMENT',
 'DATA_VALUE',
 'M-FLAG',
 'Q-FLAG',
 'S-FLAG',
 'OBS-TIME']

In [10]:
 data.schema.names

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7']

In [11]:
from functools import reduce

oldColumns = data.schema.names
df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], header[idx]), range(len(oldColumns)), data)
df.printSchema()
df.show(10)

root
 |-- ID: string (nullable = true)
 |-- YYYYMMDD: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- DATA_VALUE: string (nullable = true)
 |-- M-FLAG: string (nullable = true)
 |-- Q-FLAG: string (nullable = true)
 |-- S-FLAG: string (nullable = true)
 |-- OBS-TIME: string (nullable = true)

+-----------+--------+-------+----------+------+------+------+--------+
|         ID|YYYYMMDD|ELEMENT|DATA_VALUE|M-FLAG|Q-FLAG|S-FLAG|OBS-TIME|
+-----------+--------+-------+----------+------+------+------+--------+
|AE000041196|20170101|   TMIN|       163|  null|  null|     S|    null|
|AE000041196|20170101|   TAVG|       217|     H|  null|     S|    null|
|AEM00041194|20170101|   PRCP|         0|  null|  null|     S|    null|
|AEM00041194|20170101|   TAVG|       223|     H|  null|     S|    null|
|AEM00041218|20170101|   TMIN|       137|  null|  null|     S|    null|
|AEM00041218|20170101|   TAVG|       202|     H|  null|     S|    null|
|AEM00041217|20170101|   TMIN|       

In [12]:
df.rdd.getNumPartitions()

2

In [None]:
spark.stop()