# Slugplots with SparkR

First, we'll use a container image with all the necessary libraries. 

**Make sure to replace the `<ACCOUNT_ID>` and `<REGION>` with the appropriate values.**

In [None]:
%%sh

# Comment out the widget as it raises a (non-critical) exception for R
sed -ie '/^spark_monitoring_widget/s/^/#/' /home/emr-notebook/.ipython/profile_default/startup/init_spark_monitoring_widget.py

In [None]:
%%configure -f
{
    "conf": {
        "spark.submit.deployMode": "cluster",
        "spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE": "docker",
        "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE": "<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/emr-docker-sparkr:latest",
        "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE": "docker",
        "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE": "<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/emr-docker-sparkr:latest"
    }
}

Now, we'll read a single CSV file from the NOAA GSOD dataset in order to determine the schema.

In [None]:
df <- read.df("s3://noaa-gsod-pds/1999/01001099999.csv", 
    header = "true", 
    delimiter = ",", 
    source = "csv", 
    inferSchema = "true", 
    na.strings = "")
isd_schema <- schema(df)

Now that we have the schema, we'll go ahead and read data from 2000-2021.

In [None]:
years = c(2000:2021)
All <- lapply(years,function(i){
  read.df(paste("s3://noaa-gsod-pds", i, "", sep="/"), 
    header = "true", 
    delimiter = ",", 
    source = "csv", 
    schema = isd_schema,
    na.strings = "")
})
df_all_years = do.call("rbind", All)

Filter everything down to Seattle.

In [None]:
longLeft = -122.459696
latBottom = 47.481002
longRight = -122.224433
latTop = 47.734136
seattle_df <- filter(df_all_years,
                      df_all_years$LATITUDE >= latBottom &
                      df_all_years$LATITUDE <= latTop &
                      df_all_years$LONGITUDE >= longLeft &
                      df_all_years$LONGITUDE <= longRight)

Aggregate by day and collect it locally.

In [None]:
avg_daily_df = agg(groupBy(seattle_df, "DATE"), TEMP="avg")
local_data = collect(rename(avg_daily_df, "Mean Temperature [F]" = avg_daily_df$`avg(TEMP)`))

head(local_data)

Now we create our plot! First the static plot, just to make sure things looks reasonable.

In [None]:
library(ggridges)
library(ggplot2)

# Tweak the data a little bit:
# - Order by date
# - Add a numeric "year" column
# - Add a "month" column with full name
local_data <- local_data[order(local_data$DATE),]
local_data$year<-as.numeric(format(as.Date(local_data$DATE), format = "%Y"))
local_data$month<-months(as.Date(local_data$DATE))
local_data$month <- factor(local_data$month, levels = rev(month.name))

spl = ggplot(local_data, aes(x = `Mean Temperature [F]`, y = month, fill = stat(x))) +
  geom_density_ridges_gradient(scale = 3, rel_min_height = 0.01) +
  scale_fill_viridis_c(name = "Temp. [F]", option = "C") +
  labs(title = 'Temperatures in Seattle between 2000-2021') +
  theme_ridges(font_size = 13, grid = TRUE) +
  theme(axis.title.y = element_blank())

print(spl)

Now create the animated plot, save it locally, then to S3!

**Replace the `<BUCKET>` value below with your own S3 bucket.**

In [None]:
library(gganimate)
spl_frame = spl + transition_time(year) + labs(subtitle = "Year: {as.integer(frame_time)}")

# And save it to local disk
animate(spl_frame, height = 500, width = 800, fps = 30, duration = 10,
        end_pause = 60, res = 100,
        renderer = gifski_renderer())
anim_save("/tmp/seattle_temps_2000s.gif")

In [None]:
# Now copy the file to S3
library("aws.s3")
put_object(
    file="/tmp/seattle_temps_2000s.gif",
    object="/tmp/seattle_temps_2000s.gif",
    bucket="<BUCKET>",
)