In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ML2020SpringHW1").getOrCreate()
spark

In [10]:
df = (
    spark.read.csv("./train.csv", header=True, encoding="big5")
    .drop("測站")
    .withColumnRenamed("日期", "date")
    .withColumnRenamed("測項", "metric")
)
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- metric: string (nullable = true)
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)
 |-- 2: string (nullable = true)
 |-- 3: string (nullable = true)
 |-- 4: string (nullable = true)
 |-- 5: string (nullable = true)
 |-- 6: string (nullable = true)
 |-- 7: string (nullable = true)
 |-- 8: string (nullable = true)
 |-- 9: string (nullable = true)
 |-- 10: string (nullable = true)
 |-- 11: string (nullable = true)
 |-- 12: string (nullable = true)
 |-- 13: string (nullable = true)
 |-- 14: string (nullable = true)
 |-- 15: string (nullable = true)
 |-- 16: string (nullable = true)
 |-- 17: string (nullable = true)
 |-- 18: string (nullable = true)
 |-- 19: string (nullable = true)
 |-- 20: string (nullable = true)
 |-- 21: string (nullable = true)
 |-- 22: string (nullable = true)
 |-- 23: string (nullable = true)



In [11]:
df.show()

+--------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|    date|    metric|   0|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|  11|  12|  13|  14|  15|  16|  17|  18|  19|  20|  21|  22|  23|
+--------+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|2014/1/1|  AMB_TEMP|  14|  14|  14|  13|  12|  12|  12|  12|  15|  17|  20|  22|  22|  22|  22|  22|  21|  19|  17|  16|  15|  15|  15|  15|
|2014/1/1|       CH4| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8| 1.8|
|2014/1/1|        CO|0.51|0.41|0.39|0.37|0.35| 0.3|0.37|0.47|0.78|0.74|0.59|0.52|0.41| 0.4|0.37|0.37|0.47|0.69|0.56|0.45|0.38|0.35|0.36|0.32|
|2014/1/1|      NMHC| 0.2|0.15|0.13|0.12|0.11|0.06| 0.1|0.13|0.26|0.23| 0.2|0.18|0.12|0.11| 0.1|0.13|0.14|0.23|0.18|0.12| 0.1|0.09| 0.1|0.08|
|2014/

In [53]:
df_r = (
    df
    .selectExpr(
        "to_date(date, 'yyyy/MM/dd') as date",
        "metric",
        """
        explode(
          arrays_zip(
            array(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23),
            array(`0`,`1`,`2`,`3`,`4`,`5`,`6`,`7`,`8`,`9`,`10`,`11`,`12`,`13`,`14`,`15`,`16`,`17`,`18`,`19`,`20`,`21`,`22`,`23`)
          )
        ) as value
        """,
    )
    .selectExpr(
        "date",
        "value.`0` as hour",
        "metric",
        "value.`1` as value",
    )
    .selectExpr(
        "date",
        "hour",
        "metric",
        """case
        when value = 'NR' then
          null
        else
          float(value)
        end as value""",
    )
)
df_r.printSchema()
df_r.show(10)

root
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- metric: string (nullable = true)
 |-- value: float (nullable = true)

+----------+----+--------+-----+
|      date|hour|  metric|value|
+----------+----+--------+-----+
|2014-01-01|   0|AMB_TEMP| 14.0|
|2014-01-01|   1|AMB_TEMP| 14.0|
|2014-01-01|   2|AMB_TEMP| 14.0|
|2014-01-01|   3|AMB_TEMP| 13.0|
|2014-01-01|   4|AMB_TEMP| 12.0|
|2014-01-01|   5|AMB_TEMP| 12.0|
|2014-01-01|   6|AMB_TEMP| 12.0|
|2014-01-01|   7|AMB_TEMP| 12.0|
|2014-01-01|   8|AMB_TEMP| 15.0|
|2014-01-01|   9|AMB_TEMP| 17.0|
+----------+----+--------+-----+
only showing top 10 rows



In [54]:
from pyspark.sql import functions as fns

(
    df_r.groupBy("metric")
    .agg(
        fns.min("value").alias("min_value"),
        fns.max("value").alias("max_value"),
    )
    .orderBy("metric").show()
)

+----------+---------+---------+
|    metric|min_value|max_value|
+----------+---------+---------+
|  AMB_TEMP|    -12.3|     36.0|
|       CH4|     -0.2|      2.0|
|        CO|    -0.12|     7.57|
|      NMHC|      0.0|      1.3|
|        NO|     -1.1|     31.0|
|       NO2|      0.0|     46.0|
|       NOx|     -2.4|     71.0|
|        O3|      0.0|    231.0|
|      PM10|      0.0|    181.0|
|     PM2.5|     -1.0|    112.0|
|  RAINFALL|      0.0|     74.0|
|        RH|     29.0|     99.0|
|       SO2|     -1.6|     22.0|
|       THC|     -0.2|      3.0|
|     WD_HR|      0.1|    360.0|
|WIND_DIREC|      0.0|    360.0|
|WIND_SPEED|      0.0|      7.7|
|     WS_HR|      0.0|      7.0|
+----------+---------+---------+

