# Pollution data transformation

In this notebook I transform and enrich sensor data readings

#### Configure connections

In [3]:
# Import packages
from pyspark.sql.functions import upper, col, desc, explode
from pyspark.sql.types import *

Connection strings

In [5]:
# Azure storage connection settings
storage_account_name = "STORAGE_ACCOUNT_NAME"
storage_account_key = "STORAGE_ACCOUNT_KEY"

# Azure SQL conneciton settings
jdbc_hostname = "AZURE_SQL_HOSTNAME"
jdbc_port = 1433
jdbc_database = "AZURE_SQL_DATABASE"
jdbc_username = "AZURE_SQL_USERNAME"
jdbc_password = "AZURE_SQL_PASSWORD"

Pipeline parameters

In [7]:
# Create input widgets, which will accept parameters passed in via the ADF Databricks Notebook activity
dbutils.widgets.text("storage_container_name", "", "")

# Assign variables to the passed in values of the widgets
dbutils.widgets.get("storage_container_name")
#storage_container_name = "sensor-sink-stage"
storage_container_name = getArgument("storage_container_name")

Azure Storage Account

In [9]:
spark.conf.set("fs.azure.account.key.%(storage_account_name)s.blob.core.windows.net" % locals(), storage_account_key)
storage_connection_string = "wasbs://%(storage_container_name)s@%(storage_account_name)s.blob.core.windows.net" % locals()

# Create data frame
pollution_readings_df = spark.read.json("%(storage_connection_string)s/*.json" % locals(), multiLine=True).select(col("readingId").alias("ReadingId"), col("locationId").alias("FromLocationId"), col("pollutionLevel").alias("ReadPollutionLevel"), col("readingDateTime").alias("ReadingDateTime"))

Azure SQL Database

In [11]:
# URL
jdbc_url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbc_hostname, jdbc_port, jdbc_database)

# Connection properties
jdbc_connection_properties = {"user" : jdbc_username, "password" : jdbc_password, "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}

# Define query
sensor_locations_query = "(select Id, City, Country, Population, Latitude, Longitude from SensorLocations) sl"

# Create data frame
sensor_locations_df = spark.read.jdbc(url=jdbc_url, table=sensor_locations_query, properties=jdbc_connection_properties).select(col("Id").alias("LocationId"), upper(col("City")).alias("City"), upper(col("Country")).alias("Country"), col("Population"), col("Latitude"), col("Longitude"))

Pollution levels

In [13]:
# Convert JSON strings to DataFrames
def convertJSONToDataFrame(json, schema=None):
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

# Create schema
schema = StructType().add("values", MapType(StringType(), StringType()))

# Create data frame
pollution_level_values = convertJSONToDataFrame("""
{
  "values": {
    "1": "GOOD",
    "2": "MODERATE",
    "3": "UNHEALTHY",
    "4": "VERY UNHEALTHY",
    "5": "HAZARDOUS"
  }
}
""", schema)

pollution_level_values = pollution_level_values.select(explode("values").alias("PollutionLevelId", "PollutionLevel"))

#### Transform data

Join sensor reading data with location and pollution levels data

In [16]:
# Join data frames
pollution_data_join = pollution_readings_df.join(sensor_locations_df, pollution_readings_df.FromLocationId == sensor_locations_df.LocationId).join(pollution_level_values, pollution_readings_df.ReadPollutionLevel == pollution_level_values.PollutionLevelId).select("ReadingId", "ReadingDateTime", "PollutionLevelId", "PollutionLevel", "LocationId", "City", "Country", "Population", "Latitude", "Longitude").sort(col("Population").desc())

#### Save data

Saved transofrmed data to Azure SQL Database

In [19]:
pollution_data_join.write.jdbc(url=jdbc_url,table='SensorReadings',mode='append',properties=jdbc_connection_properties)