In [1]:
sc

In [2]:
myRange = spark.range(1000).toDF("number")

In [3]:
divisBy2 = myRange.where("number % 2 = 0")

In [4]:
divisBy2.count()

500

In [1]:
from pyspark.shell import SparkSession

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/

Using Python version 3.8.10 (default, May 26 2023 14:05:08)
Spark context Web UI available at http://ip-172-31-63-145.ap-northeast-2.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1691551769202).
SparkSession available as 'spark'.


In [2]:
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

In [3]:
from pyspark.sql.functions import (to_json, posexplode, udf,col,
                                  avg, round, current_timestamp,
                                  min,max,from_unixtime, lit,array)

In [4]:
spark_session = SparkSession.builder.appName("airdata-batch-aggregation").getOrCreate()

In [5]:
data_source_path = "s3a://de432-raw-apnortheast2-073658113926-dev/airkorea/kr/airdata/"

In [6]:
schema = StructType([
    StructField("event_time", LongType(), False),
    StructField("pm_10", LongType(), False),
    StructField("o3", DoubleType(), False),
    StructField("no2", DoubleType(), False),
    StructField("co", DoubleType(), False),
    StructField("so2", DoubleType(), False),
])

In [7]:
airDF = spark_session.read.format("parquet").schema(schema).option("path",data_source_path).load()

In [9]:
airDF.show()

+----------+-----+-----+-----+---+-----+----+-----+---+----+
|event_time|pm_10|   o3|  no2| co|  so2|year|month|day|hour|
+----------+-----+-----+-----+---+-----+----+-----+---+----+
|1691542800|   11|0.022|0.009|0.3|0.002|2023|    8|  9|  10|
|1691539200|    6| 0.02|0.008|0.3|0.002|2023|    8|  9|  10|
|1691535600|    6|0.018|0.008|0.3|0.002|2023|    8|  9|  10|
|1691532000|   12|0.018|0.007|0.3|0.003|2023|    8|  9|  10|
|1691528400|    6| 0.02|0.005|0.2|0.002|2023|    8|  9|  10|
|1691524800|    3|0.021|0.004|0.2|0.002|2023|    8|  9|  10|
|1691521200|    5|0.021|0.004|0.2|0.003|2023|    8|  9|  10|
|1691517600|    5|0.019|0.005|0.2|0.002|2023|    8|  9|  10|
|1691514000|    5|0.018|0.005|0.2|0.003|2023|    8|  9|  10|
|1691510400|    6|0.017|0.006|0.2|0.003|2023|    8|  9|  10|
|1691506800|    6|0.017|0.007|0.3|0.002|2023|    8|  9|  10|
|1691503200|   10|0.016|0.008|0.3|0.002|2023|    8|  9|  10|
|1691499600|    8|0.018|0.008|0.3|0.003|2023|    8|  9|  10|
|1691496000|   10|0.021|

In [10]:
target_columns = ["pm_10", "o3", "no2", "co", "so2"]
transAirDF = airDF \
            .withColumn("values", array(target_columns)) \
            .select("event_time","values")

In [11]:
transAirDF.show(2, False)

+----------+--------------------------------+
|event_time|values                          |
+----------+--------------------------------+
|1691542800|[11.0, 0.022, 0.009, 0.3, 0.002]|
|1691539200|[6.0, 0.02, 0.008, 0.3, 0.002]  |
+----------+--------------------------------+
only showing top 2 rows



In [14]:
def get_stattype(index):
    stattypes = [1,2,3,4,5]
    return stattypes[index]

get_stattype_udf = udf(get_stattype)

In [18]:
stageAirDF = transAirDF.select("event_time", posexplode(col("values"))).withColumn("stat_id",get_stattype_udf(col("pos"))).withColumnRenamed("col", "stat_value").select("event_time","stat_id","stat_value").show(10,False)

+----------+-------+----------+
|event_time|stat_id|stat_value|
+----------+-------+----------+
|1691542800|1      |11.0      |
|1691542800|2      |0.022     |
|1691542800|3      |0.009     |
|1691542800|4      |0.3       |
|1691542800|5      |0.002     |
|1691539200|1      |6.0       |
|1691539200|2      |0.02      |
|1691539200|3      |0.008     |
|1691539200|4      |0.3       |
|1691539200|5      |0.002     |
+----------+-------+----------+
only showing top 10 rows

