In [33]:
from pyspark import SparkConf
import pyspark.sql.functions as psf
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame, Column

In [83]:
config = {"spark.jars.packages":"org.apache.hadoop:hadoop-aws:3.3.1", 
"spark.hadoop.fs.s3.impl":"org.apache.hadoop.fs.s3a.S3AFileSystem", 
"fs.s3a.aws.credentials.provider":"com.amazonaws.auth.DefaultAWSCredentialsProviderChain"}

conf = SparkConf().setAll(config.items())
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [92]:
df = spark.read.json("s3://dataminded-academy-capstone-resources/raw/open_aq/")

                                                                                

In [8]:
df.columns

['city',
 'coordinates',
 'country',
 'date',
 'entity',
 'isAnalysis',
 'isMobile',
 'location',
 'locationId',
 'parameter',
 'sensorType',
 'unit',
 'value']

In [14]:
df
# coordinates and date are nested

DataFrame[city: string, coordinates: struct<latitude:double,longitude:double>, country: string, date: struct<local:string,utc:string>, entity: string, isAnalysis: boolean, isMobile: boolean, location: string, locationId: bigint, parameter: string, sensorType: string, unit: string, value: double]

In [40]:
df.select("coordinates").show()

type(df.select("coordinates"))

# for element in df.select("coordinates"):
#     print(type(element))

+-----------------+
|      coordinates|
+-----------------+
| {50.904, 4.6959}|
| {50.904, 4.6959}|
| {50.904, 4.6959}|
| {50.921, 3.4562}|
| {50.921, 3.4562}|
| {50.921, 3.4562}|
|{50.9815, 3.5463}|
|{50.9815, 3.5463}|
|{50.9815, 3.5463}|
|{50.8864, 4.7003}|
|{50.8864, 4.7003}|
|{50.8864, 4.7003}|
|{51.2252, 3.7902}|
|{51.2252, 3.7902}|
|{51.2252, 3.7902}|
| {51.071, 4.7037}|
| {51.071, 4.7037}|
| {51.071, 4.7037}|
| {50.904, 4.6959}|
| {50.904, 4.6959}|
+-----------------+
only showing top 20 rows



pyspark.sql.dataframe.DataFrame

In [93]:
df = (
    df.withColumn(colName="coordinates_latitude", col=df["coordinates"].getField("latitude"))
    .withColumn(colName="coordinates_longitude", col=df["coordinates"].getField("longitude"))
    .drop("coordinates")
    .withColumn(colName="date_local", col=df["date"].getField("local"))
    .withColumn(colName="date_utc", col=df["date.utc"])
    .drop("date")
)

In [94]:
df.printSchema()

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- isAnalysis: boolean (nullable = true)
 |-- isMobile: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- locationId: long (nullable = true)
 |-- parameter: string (nullable = true)
 |-- sensorType: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)
 |-- coordinates_latitude: double (nullable = true)
 |-- coordinates_longitude: double (nullable = true)
 |-- date_local: string (nullable = true)
 |-- date_utc: string (nullable = true)



In [111]:
# psf.to_utc_timestamp()

+----+-------+---------+----------+--------+--------------------+----------+---------+---------------+-----+-----+--------------------+---------------------+--------------------+--------------------+
|city|country|   entity|isAnalysis|isMobile|            location|locationId|parameter|     sensorType| unit|value|coordinates_latitude|coordinates_longitude|          date_local|            date_utc|
+----+-------+---------+----------+--------+--------------------+----------+---------+---------------+-----+-----+--------------------+---------------------+--------------------+--------------------+
|null|     BE|community|     false|   false|      Wilsele-Herent|     66110|      pm1|low-cost sensor|µg/m³|  1.7|              50.904|               4.6959|2021-02-02T22:59:...|2021-02-02T23:59:...|
|null|     BE|community|     false|   false|      Wilsele-Herent|     66110|     pm25|low-cost sensor|µg/m³|  4.1|              50.904|               4.6959|2021-02-02T22:59:...|2021-02-02T23:59:...|


In [121]:
utc_df = df.withColumn(colName="UTC", col=df["date_utc"])
# 2021-02-02T23:59:

psf.to_utc_timestamp(utc_df.UTC, tz=None)
utc_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- isAnalysis: boolean (nullable = true)
 |-- isMobile: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- locationId: long (nullable = true)
 |-- parameter: string (nullable = true)
 |-- sensorType: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)
 |-- coordinates_latitude: double (nullable = true)
 |-- coordinates_longitude: double (nullable = true)
 |-- date_local: string (nullable = true)
 |-- date_utc: string (nullable = true)
 |-- UTC: string (nullable = true)

