In [42]:
// Starting Spark Job
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1e4b87fe
import org.apache.spark.sql.functions._


In [3]:
// Load files to DataFrame
val weather = spark.read
                .option("header", true)
                .option("inferSchema", true)
                .csv("data/2019")
                .cache()

val stationList = spark.read
                .option("header", true)
                .option("inferSchema", true)
                .csv("stationlist.csv")

val countryList = spark.read
                .option("header", true)
                .option("inferSchema", true)
                .csv("countrylist.csv")

weather: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN---: int, WBAN: int ... 14 more fields]
stationList: org.apache.spark.sql.DataFrame = [STN_NO: string, COUNTRY_ABBR: string]
countryList: org.apache.spark.sql.DataFrame = [COUNTRY_ABBR: string, COUNTRY_FULL: string]


In [4]:
// Show schemas and sample data
weather.printSchema()
stationList.printSchema()
countryList.printSchema()
weather.show(2, false)
stationList.show(2, false)
countryList.show(2, false)


root
 |-- STN---: integer (nullable = true)
 |-- WBAN: integer (nullable = true)
 |-- YEARMODA: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: integer (nullable = true)

root
 |-- STN_NO: string (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)

root
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+----+------+
|STN---|WBAN |YEARMODA|TEMP|DEWP|SLP   |STP   |VISIB|WDSP|MXSPD|GUST|MAX  |MIN  |PRCP |SNDP|FRSHTT|
+------+-----+--------+---

In [5]:
// Prepare station - country DF
val stationCountry = stationList.join(countryList, stationList("COUNTRY_ABBR") === countryList("COUNTRY_ABBR"))
            .select("STN_NO", "COUNTRY_FULL")
            .cache()

stationCountry: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN_NO: string, COUNTRY_FULL: string]


In [6]:
// Get Min and Max Value of Date
// Make sure there is no out-of-range date
val dateDF = weather.agg(min("YEARMODA"), max("YEARMODA")).show


        

+-------------+-------------+
|min(YEARMODA)|max(YEARMODA)|
+-------------+-------------+
|     20190101|     20200101|
+-------------+-------------+



dateDF: Unit = ()


In [27]:
/* 

Q1: Which country had the hottest average mean temperature over the year? 

*/

val stationTempDF = weather.select("STN---", "TEMP")
                            .cache()

val countryTempDF = stationTempDF.join(stationCountry, stationTempDF("STN---") === stationCountry("STN_NO"))
                                    .cache()

val maxMeanTemp = countryTempDF.select("COUNTRY_FULL", "TEMP")
        .filter($"TEMP" < 9999.9)
        .groupBy("COUNTRY_FULL")
        .agg(avg("TEMP").as("avg_temp"))
        .orderBy(desc("avg_temp"))
        .limit(1)
        .cache()
        .show


+------------+-----------------+
|COUNTRY_FULL|         avg_temp|
+------------+-----------------+
|    DJIBOUTI|90.06114457831325|
+------------+-----------------+



stationTempDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN---: int, TEMP: double]
countryTempDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN---: int, TEMP: double ... 2 more fields]
maxMeanTemp: Unit = ()


In [54]:
/* 

Q2: Which country had the most consecutive days of tornadoes/funnel cloud formations?

*/


val stationFRSHTTDF = weather.select("STN---", "YEARMODA", "FRSHTT")
                                .cache()

val countryTempDF = stationFRSHTTDF.join(stationCountry, stationTempDF("STN---") === stationCountry("STN_NO"))
                                    .cache()

// val country = countryTempDF.filter(($"FRSHTT" / 10) % 10 === 1)
//                             .select("YEARMODA", "COUNTRY_FULL")
//                             .dropDuplicates()
//                             .groupBy("COUNTRY_FULL")
//                             .agg(sort_array(collect_list("YEARMODA")))
//                             .show



//                             .orderBy("YEARMODA")
//                             .select("YEARMODA", "COUNTRY_FULL")
//                             .show(100)
//                             .repartition($"COUNTRY_FULL")
//                             .orderBy("YEARMODA")
//                             .show   


+------------+--------+------+
|COUNTRY_FULL|YEARMODA|rowNum|
+------------+--------+------+
|     ARMENIA|20190101|     1|
|     ARMENIA|20190102|     2|
|     ARMENIA|20190103|     3|
|     ARMENIA|20190104|     4|
|     ARMENIA|20190105|     5|
|     ARMENIA|20190106|     6|
|     ARMENIA|20190107|     7|
|     ARMENIA|20190108|     8|
|     ARMENIA|20190109|     9|
|     ARMENIA|20190110|    10|
|     ARMENIA|20190111|    11|
|     ARMENIA|20190112|    12|
|     ARMENIA|20190113|    13|
|     ARMENIA|20190114|    14|
|     ARMENIA|20190115|    15|
|     ARMENIA|20190116|    16|
|     ARMENIA|20190117|    17|
|     ARMENIA|20190118|    18|
|     ARMENIA|20190119|    19|
|     ARMENIA|20190120|    20|
+------------+--------+------+
only showing top 20 rows



stationFRSHTTDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN---: int, YEARMODA: int ... 1 more field]
countryTempDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN---: int, YEARMODA: int ... 3 more fields]
country: Unit = ()


In [66]:
val country = countryTempDF
        .select("YEARMODA", "COUNTRY_FULL", "FRSHTT")
        .withColumn("Tornado", when(($"FRSHTT" / 10) % 10 === 1, 1).otherwise(0))
        .select("YEARMODA", "COUNTRY_FULL", "Tornado")
        .dropDuplicates()
        .createOrReplaceTempView("table")

spark.sql("SELECT COUNTRY_FULL, 
            YEARMODA, 
            Tornado, 
            GroupingSet = DATEADD(DAY, -row_number() over (partition by COUNTRY_FULL order by YEARMODA), UserDate)
            FROM table").show

<console>: 9: error: unclosed string literal

In [28]:
/* 

Q3: Which country had the second highest average mean wind speed over the year?

*/

val stationWDSPDF = weather.select("STN---", "WDSP")
                            .cache()

val countryWDSPDF = stationWDSPDF.join(stationCountry, stationWDSPDF("STN---") === stationCountry("STN_NO"))
                                    .cache()

val maxMeanTemp = countryWDSPDF.select("COUNTRY_FULL", "WDSP")
        .filter($"WDSP" < 9999.9)
        .groupBy("COUNTRY_FULL")
        .agg(avg("WDSP").as("avg_wdsp"))
        .orderBy(desc("avg_wdsp"))
        .limit(2)
        .orderBy(asc("avg_wdsp"))
        .limit(1)
        .cache()
        .show


+------------+------------------+
|COUNTRY_FULL|          avg_wdsp|
+------------+------------------+
|     ARMENIA|457.36593182657737|
+------------+------------------+



stationWDSPDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN---: int, WDSP: double]
countryWDSPDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [STN---: int, WDSP: double ... 2 more fields]
maxMeanTemp: Unit = ()
