In [12]:
import os
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, to_timestamp, year
from pyspark.sql.types import StructType, StructField, StringType, FloatType

In [13]:
bucket = "spark-eks-data-lake-"

In [14]:
session = boto3.Session()
credentials = session.get_credentials().get_frozen_credentials()
access_key = credentials.access_key
secret_key = credentials.secret_key

In [15]:
spark = SparkSession\
          .builder\
          .appName("WeatherData")\
          .master('local[*]')\
          .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
          .config("spark.hadoop.fs.s3a.access.key", access_key)\
          .config("spark.hadoop.fs.s3a.secret.key", secret_key)\
          .getOrCreate()

In [16]:
schema = StructType([
    StructField("station_id", StringType(), False),
    StructField("date", StringType(), False),
    StructField("element", StringType(), False),
    StructField("value", FloatType(), False),
    StructField("m_flag", StringType(), True),
    StructField("q_flag", StringType(), True),
    StructField("s_flag", StringType(), True),
    StructField("obs_time", StringType(), True)
])

In [17]:
df = spark.read.csv("s3a://{}/weather/".format(bucket), header=False, schema=schema).cache()

In [18]:
df_tavg = df.filter(col("element") == "TAVG")\
  .withColumn("year", year(to_timestamp("date", 'yyyyMMdd')))\
  .groupBy("station_id", "year")\
  .agg(avg("value").alias("tavg"))\
  .cache()

In [19]:
df_tavg.write.json("s3a://{}/weather-tavg/".format(bucket), mode="overwrite")

In [20]:
spark.stop()