# Serverless Data Prep with Glue Interactive Sessions
We can scale our data preparation using serverless Spark or Ray with native integration with AWS Glue Interactive Sessions

### What is AWS Glue

AWS Glue is a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development.

![img](https://d1.awsstatic.com/reInvent/reinvent-2022/glue/Product-Page-Diagram_AWS-Glue_for-Ray%402x.f34b47cf0280c7d843ea457b704ea512bebd91d5.png)


## Objective 

Want to predict the amount of NO2 in the area based on weather conditions

![img](https://upload.wikimedia.org/wikipedia/commons/thumb/b/b1/Origins_of_acid_rain.svg/1280px-Origins_of_acid_rain.svg.png)


### Datasets in our Example

[OpenAQ Physical Air Quality Data](https://registry.opendata.aws/openaq/):
* Global, aggregated physical air quality data from public data sources provided by government, research-grade and other sources.
* 42GB of Data


[NOAA Global Surface Summary of Day](https://registry.opendata.aws/noaa-gsod/):
* Global summary of day data for 18 surface meteorological elements are derived from the synoptic/hourly observations contained in USAF DATSAV3 Surface data and Federal Climate Complex Integrated Surface Hourly (ISH).


### Set Configurations

In [None]:
%help

In [None]:
%session_id_prefix air-analysis
%glue_version 3.0
%number_of_workers 10
%idle_timeout 180

In [None]:
print(spark.version)

In [None]:
bucket = <"YOUR_S3_BUCKET">

In [None]:
schema_df = spark.read.json("s3://openaq-fetches/realtime-gzipped/2022-01-05/1641340870.ndjson.gz")
df = spark.read.schema(schema_df.schema).json("s3://openaq-fetches/realtime-gzipped/20*")
df.show()

In [None]:
from pyspark.sql.functions import split, lower, to_date

yr_split_args = (df.date.utc, "-", 0)
dfSea = df.filter(lower((df.city)).contains('seattle')).filter(df.parameter == "no2").withColumn("year", split(*yr_split_args)[0]).cache()
dfSea.show(truncate=False)

In [None]:
dfNoAvg = dfSea.withColumn("ymd", to_date(dfSea.date.utc)).groupBy("ymd").avg("value").withColumnRenamed("avg(value)", "no2_avg")
dfNoAvg.show()

In [None]:
# Write to S3
dfNoAvg.coalesce(1).write.parquet(f"s3://{bucket}/subset-aggregate-no2.parquet")

In [None]:
from pyspark.sql.functions import min, max, year
year_min, year_max = dfNoAvg.select(year(min("ymd")), year(max("ymd"))).first()

## Weather

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F

# Scope to Seattle, WA, USA
longLeft, latBottom, longRight, latTop = [-122.459696,47.481002,-122.224433,47.734136]

dfSchema = spark.read.csv("s3://noaa-gsod-pds/2022/32509099999.csv", header=True, inferSchema=True)

# We read our first year, then union the rest of the years :)
def read_year(year):
    return spark.read.csv(f"s3://noaa-gsod-pds/{year}/", header=True, schema=dfSchema.schema)

year_range = range(year_min, year_max+1)
df = read_year(year_range[0])
for year in year_range[1:]:
    df = df.union(read_year(year))

df = df \
        .withColumn('LATITUDE', df.LATITUDE.cast(DoubleType())) \
        .withColumn('LONGITUDE', df.LONGITUDE.cast(DoubleType()))

seadf = df \
        .filter(df.LATITUDE >= latBottom) \
        .filter(df.LATITUDE <= latTop) \
        .filter(df.LONGITUDE >= longLeft) \
        .filter(df.LONGITUDE <= longRight)

# Rename columns so they're easier to read
seafeatures = seadf.selectExpr("Date as date", "MAX as temp_max", "MIN as temp_min", "WDSP as wind_avg", "SLP as pressure_sea_level", "STP as pressure_station", "VISIB as visibility")

# Remove invalid readings
no_data_mappings = [
    ["temp_max", 9999.9],
    ["temp_min", 9999.9],
    ["wind_avg", 999.9],
    ["pressure_sea_level", 9999.9],
    ["pressure_station", 9999.9],
    ["visibility", 999.9],
]
for [name, val] in no_data_mappings:
    seafeatures = seafeatures.withColumn(name, F.when(F.col(name)==val, None).otherwise(F.col(name)))
    
# Now average each reading per day
seafeatures = seafeatures.groupBy("date").agg(*[F.mean(c).alias(c) for c in seafeatures.columns[1:]])

In [None]:
seafeatures.coalesce(1).write.parquet(f"s3://{bucket}/subset-seattle-weather.parquet")

# End the Session