In [23]:
import pandas as pd
import datetime
import os
import boto3

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark
from datetime import datetime, timedelta
from pyspark.sql.functions import to_date,date_format,col,from_unixtime,unix_timestamp

In [2]:
role = get_execution_role()
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = (
    SparkSession.builder.config("spark.driver.extraClassPath", classpath)
    .master("local[*]")
    .getOrCreate()
)

spark

In [30]:
from pyspark.sql.functions import lit

region = boto3.Session().region_name
endpoint_domain = "com"
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.endpoint", "s3.{}.amazonaws.{}".format(region, endpoint_domain)
)

first_date = "2022-01-01"

start_date = datetime.strptime('2022-01-02', '%Y-%m-%d') 
end_date = datetime.utcnow()

delta = end_date - start_date

day = ""
df = (
        spark.read.format("json")
        .option("numFeatures", "784")
        .load(f"s3a://openaq-fetches/realtime-gzipped/{first_date}".format(region))
    )
df = df.withColumn("u_date",lit(first_date)).withColumn("lat",col("coordinates.latitude")).withColumn("long",col("coordinates.longitude"))

for i in range(delta.days + 1):
    day = start_date + timedelta(days=i)
    day = day.strftime("%Y-%m-%d")

    temp_df = (
        spark.read.format("json")
        .option("numFeatures", "784")
        .load(f"s3a://openaq-fetches/realtime-gzipped/{day}".format(region))
    )
    temp_df = temp_df.withColumn("u_date",lit(day)).withColumn("lat",col("coordinates.latitude")).withColumn("long",col("coordinates.longitude"))

    diff1 = [c for c in temp_df.columns if c not in df.columns]
    diff2 = [c for c in df.columns if c not in temp_df.columns]
    df = df.select('*', *[lit(None).alias(c) for c in diff1]) \
    .unionByName(temp_df.select('*', *[lit(None).alias(c) for c in diff2]))


df.show()


In [31]:
df1 = df.filter(df.country == "US")
df1 = df1.select(['city','country','lat','long','u_date','location','parameter','unit','value'])
df1.show()

+-----------------+-------+---------+-----------+----------+--------------------+---------+-----+-----+
|             city|country|      lat|       long|    u_date|            location|parameter| unit|value|
+-----------------+-------+---------+-----------+----------+--------------------+---------+-----+-----+
|Birmingham-Hoover|     US|  33.5531|    -86.815|2022-01-01|            NO. BHAM|       o3|  ppm|0.021|
|Birmingham-Hoover|     US|  33.5531|    -86.815|2022-01-01|            NO. BHAM|     pm10|µg/m³| 16.0|
|Birmingham-Hoover|     US|  33.5531|    -86.815|2022-01-01|            NO. BHAM|     pm25|µg/m³|  8.0|
|Birmingham-Hoover|     US|  33.3311|   -87.0036|2022-01-01|             MCADORY|     pm25|µg/m³|  7.4|
|Birmingham-Hoover|     US|  33.4997|   -86.9242|2022-01-01|               WYLAM|     pm10|µg/m³| 17.0|
|Birmingham-Hoover|     US|  33.4997|   -86.9242|2022-01-01|               WYLAM|     pm25|µg/m³|  8.6|
|Birmingham-Hoover|     US|  33.8017|   -86.9425|2022-01-01|    

In [32]:
df2 = df1.withColumn("month",from_unixtime(unix_timestamp(col("u_date"),'yyyy-MM-DD'),'MMM'))
df3 = df2.groupBy(["city","month"]).agg({'value':'avg'})
df3.count()


1230

In [None]:
df3.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("s3a://openaq-analytics/2022/")