In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=23631ce0748d6f0e604139f23e08da0c5a2caa4014c641233850f6ea3302337e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()
from pyspark.sql import SQLContext, SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("Combine CASTNET Files")
         .config("spark.ui.port", "4050")
         .getOrCreate())

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [4]:
import os
import shutil
import sys
import time
import numpy as np
import pandas as pd
from google.colab import drive

drive_root = "/content/drive"
drive.mount(drive_root)

file_directory = os.path.join(drive_root, "MyDrive", "AML Group 24", "Files")
os.listdir(file_directory)

FILE_OUTPUT = "output"
if not os.path.exists(FILE_OUTPUT):
    os.makedirs(FILE_OUTPUT)

Mounted at /content/drive


In [5]:
def save_df_to_parquet(df, output_folder):
  df.coalesce(1).write.format("parquet").mode("append").save(os.path.join(FILE_OUTPUT, output_folder))

  parquet_file = [i for i in os.listdir(os.path.join(FILE_OUTPUT, output_folder)) if i.endswith("parquet")]
  assert len(parquet_file) == 1, "Did you run this cell multiple times?"
  parquet_file = parquet_file[0]

  shutil.move(os.path.join(FILE_OUTPUT, output_folder, parquet_file), os.path.join(FILE_OUTPUT, f"{output_folder}.snappy.parquet"))
  shutil.rmtree(os.path.join(FILE_OUTPUT, output_folder))

  print(os.stat(os.path.join(FILE_OUTPUT, f"{output_folder}.snappy.parquet")).st_size/(1024**2))

# Site

In [12]:
site_options = {
    "header": True,
}

schema = StructType()
schema.add("SITE_ID", StringType(), False)
schema.add("SITE_NUM", DecimalType(38,0))
schema.add("SITE_NAME", StringType())
schema.add("ACTIVE", StringType())
schema.add("INACTIVE", StringType())
schema.add("AGENCY", StringType())
schema.add("STATE", StringType())
schema.add("COUNTY", StringType())
schema.add("TIME_ZONE", StringType())
schema.add("LATITUDE", DecimalType(14,6))
schema.add("LONGITUDE", DecimalType(14,6))
schema.add("ELEVATION", DecimalType(20,5))
schema.add("MAPID", StringType())
schema.add("LAND_USE", StringType())
schema.add("TERRAIN", StringType())
schema.add("MLM", StringType())
schema.add("NADP_ID", StringType())
schema.add("NADP_DISTANCE", DecimalType(12,4))
schema.add("UPDATE_DATE", StringType())

site_file = os.path.join(file_directory, "site.csv")
df_site = spark.read.format("csv").schema(schema).options(**site_options).load(site_file)

df_site.printSchema()


site_columns_to_keep = ["SITE_ID", "LATITUDE", "LONGITUDE", "ELEVATION", "LAND_USE", "TERRAIN"]

df_site = df_site.select([c for c in df_site.columns if c in site_columns_to_keep])

df_site.printSchema()

root
 |-- SITE_ID: string (nullable = true)
 |-- SITE_NUM: decimal(38,0) (nullable = true)
 |-- SITE_NAME: string (nullable = true)
 |-- ACTIVE: string (nullable = true)
 |-- INACTIVE: string (nullable = true)
 |-- AGENCY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- TIME_ZONE: string (nullable = true)
 |-- LATITUDE: decimal(14,6) (nullable = true)
 |-- LONGITUDE: decimal(14,6) (nullable = true)
 |-- ELEVATION: decimal(20,5) (nullable = true)
 |-- MAPID: string (nullable = true)
 |-- LAND_USE: string (nullable = true)
 |-- TERRAIN: string (nullable = true)
 |-- MLM: string (nullable = true)
 |-- NADP_ID: string (nullable = true)
 |-- NADP_DISTANCE: decimal(12,4) (nullable = true)
 |-- UPDATE_DATE: string (nullable = true)

root
 |-- SITE_ID: string (nullable = true)
 |-- LATITUDE: decimal(14,6) (nullable = true)
 |-- LONGITUDE: decimal(14,6) (nullable = true)
 |-- ELEVATION: decimal(20,5) (nullable = true)
 |-- LAND_USE: strin

In [11]:
display(df_site)

SITE_ID,LATITUDE,LONGITUDE,ELEVATION,LAND_USE,TERRAIN
ABT147,41.84046,-72.010368,202.0,Urban/Agric,Rolling
ACA416,44.377086,-68.2608,158.0,Forest,Rolling
ALB801,53.6824,-112.868,711.0,Forest,Flat
ALC188,30.701577,-94.674011,105.0,Prairie,Rolling
ALH157,38.869,-89.6228,164.0,Agric,Flat
ALH257,38.869001,-89.622815,164.0,Agric,Flat
ALH557,38.869001,-89.622815,164.0,Agric,Flat
ANA115,42.416636,-83.90218,266.0,Forest,Flat
ANL146,41.7,-87.99,229.0,Urban/Agric,Rolling
ARE128,39.923241,-77.307863,266.0,Agric,Rolling


In [32]:
output_folder = "site"
save_df_to_parquet(df_site, output_folder)

0.005766868591308594


# Hourly Gas

In [13]:
gas_file = os.path.join(file_directory, "hourly_gas_combined.snappy.parquet")
df_gas = spark.read.format("parquet").load(gas_file)

# print(df_gas.count())

df_gas = df_gas.where(~F.col("PARAMETER").isin(["O3_23M", "TEMP_23M", "O3_10M", "O3_14M", "O3_17M", "O3_20M", "O3_28M", "O3_2M", "O3_6M", "TEMP_2M"]))

df_gas = df_gas.where(F.col("QA_CODE").isin(["1", "3"]))
df_gas = df_gas.where(F.col("VALUE_F").isNull() | F.col("VALUE_F").isin("<", "A", "S", "U", "Z"))

# print(df_gas.count())

df_gas = df_gas.select(["SITE_ID", "DATE_TIME", "PARAMETER", "VALUE"]) # needs to be pivoted
print(f"Original Count: {df_gas.count()}")

df_just_2_pk = df_gas.select("SITE_ID", "DATE_TIME").distinct()
print(f"Number of rows in pivot: {df_just_2_pk.count()}")

Original Count: 2475297
Number of rows in pivot: 624865


In [14]:
df_gas_pivot = df_gas.groupBy("SITE_ID", "DATE_TIME").pivot("PARAMETER").sum("VALUE")
print(f"Pivot Count: {df_gas_pivot.count()}")

Pivot Count: 624865


In [15]:
output_folder = "gas_pivot"
save_df_to_parquet(df_gas_pivot, output_folder)

10.484942436218262


In [16]:
del df_just_2_pk
del df_gas_pivot

In [17]:
spark.read.format("parquet").load("output/gas_pivot.snappy.parquet").count()

624865

# Met

* We will just filter our label OZONE by the OZONE_F flag for valid date
* For deeper analysis, we will see which features we can completely drop before we apply the corresponding _F filter

In [18]:
met_file = os.path.join(file_directory, "metdata_combined.snappy.parquet")
df_met = spark.read.format("parquet").load(met_file)

# print(df_met.count())

df_met = df_met.filter(F.col("OZONE_F").isNull() | F.col("OZONE_F").isin(["<", "E", "K", "S", "^", "_"]))
# print(df_met.count())

df_met = df_met.where(F.col("QA_CODE") != "X")

print(df_met.count())

# Other _F
#~ (['&', '<', 'A', 'E', 'K', 'Q', 'S', 'W', 'Y', '^', '_']

7017729


In [19]:
output_folder = "met"
save_df_to_parquet(df_met, output_folder)

92.95935249328613


In [20]:
spark.read.format("parquet").load("output/met.snappy.parquet").count()

7017729

In [21]:
del df_met

# Join met with gas

In [22]:
df_gas_pivot = spark.read.format("parquet").load(os.path.join("output", "gas_pivot.snappy.parquet"))
df_met = spark.read.format("parquet").load(os.path.join("output", "met.snappy.parquet"))

df_met_gas = df_met.join(df_gas_pivot, how="inner", on=["SITE_ID", "DATE_TIME"])

df_met_gas.count()

574851

In [23]:
len(df_met_gas.columns)

46

In [24]:
output_folder = "met_gas"
save_df_to_parquet(df_met_gas, output_folder)

17.029431343078613


In [25]:
del df_met_gas

In [26]:
spark.read.format("parquet").load("output/met_gas.snappy.parquet").count()

574851

# Join met with gas with site

In [27]:
met_gas_file = os.path.join("output", "met_gas.snappy.parquet")
df_met_gas = spark.read.format("parquet").load(met_gas_file)

df_met_gas_site = df_met_gas.join(df_site, how="inner", on=["SITE_ID"])
df_met_gas_site.count()

574851

In [28]:
df_met_gas_site.printSchema()

root
 |-- SITE_ID: string (nullable = true)
 |-- DATE_TIME: timestamp (nullable = true)
 |-- TEMPERATURE: decimal(16,4) (nullable = true)
 |-- TEMPERATURE_F: string (nullable = true)
 |-- TEMPERATURE_DELTA: decimal(16,4) (nullable = true)
 |-- TEMPERATURE_DELTA_F: string (nullable = true)
 |-- RELATIVE_HUMIDITY: decimal(16,4) (nullable = true)
 |-- RELATIVE_HUMIDITY_F: string (nullable = true)
 |-- SOLAR_RADIATION: decimal(16,4) (nullable = true)
 |-- SOLAR_RADIATION_F: string (nullable = true)
 |-- OZONE: decimal(16,4) (nullable = true)
 |-- OZONE_F: string (nullable = true)
 |-- PRECIPITATION: decimal(16,4) (nullable = true)
 |-- PRECIPITATION_F: string (nullable = true)
 |-- WINDSPEED: decimal(16,4) (nullable = true)
 |-- WINDSPEED_F: string (nullable = true)
 |-- WIND_DIRECTION: decimal(16,4) (nullable = true)
 |-- WIND_DIRECTION_F: string (nullable = true)
 |-- SIGMA_THETA: decimal(16,4) (nullable = true)
 |-- SIGMA_THETA_F: string (nullable = true)
 |-- FLOW_RATE: decimal(16,4) (

In [29]:
output_folder = "met_gas_site"
save_df_to_parquet(df_met_gas_site, output_folder)

17.274246215820312


In [30]:
spark.read.format("parquet").load("output/met_gas_site.snappy.parquet").count()

574851

In [31]:
result = spark.read.format("parquet").load("output/met_gas_site.snappy.parquet")
display(result)

SITE_ID,DATE_TIME,TEMPERATURE,TEMPERATURE_F,TEMPERATURE_DELTA,TEMPERATURE_DELTA_F,RELATIVE_HUMIDITY,RELATIVE_HUMIDITY_F,SOLAR_RADIATION,SOLAR_RADIATION_F,OZONE,OZONE_F,PRECIPITATION,PRECIPITATION_F,WINDSPEED,WINDSPEED_F,WIND_DIRECTION,WIND_DIRECTION_F,SIGMA_THETA,SIGMA_THETA_F,FLOW_RATE,FLOW_RATE_F,WINDSPEED_SCALAR,WINDSPEED_SCALAR_F,WETNESS,WETNESS_F,SHELTER_TEMPERATURE,SHELTER_TEMPERATURE_F,QA_CODE,UPDATE_DATE,Filename,CO,FLOW_23M,FLOW_2M,HNO3,NH3,NO,NO2_TRUE,NOX,NOXDIF,NOX_TRUE,NOY,NOYDIF,NOY_MINUS,SO2_GA,TNX,LATITUDE,LONGITUDE,ELEVATION,LAND_USE,TERRAIN
BEL116,2013-01-01 22:00:00,3.859,,0.044,,67.79,,0.475,,23.32,,0.0,,1.008,,318.5,,36.97,,1.515,,1.193,,0.0,,27.05,,3,2013-01-01 22:00:00,metdata_2013.csv,,,,,,0.029,,,,,,,,0.999,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-02 03:00:00,1.573,,0.057,,63.01,,0.471,,30.53,,0.0,,0.725,,5.038,,38.44,,1.515,,0.869,,0.0,,26.56,,3,2013-01-02 03:00:00,metdata_2013.csv,,,,,,0.025,,,,,,,,2.382,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-02 07:00:00,-0.347,,0.175,,64.32,,9.45,,28.37,,0.0,,0.674,,352.5,,34.32,,1.515,,0.786,,0.0,,25.69,,3,2013-01-02 07:00:00,metdata_2013.csv,,,,,,0.031,,,,,,,,2.926,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-02 08:00:00,-0.44,,0.053,,64.22,,106.4,,26.0,,0.0,,0.818,,316.4,,34.06,,1.515,,0.941,,0.0,,25.34,,3,2013-01-02 08:00:00,metdata_2013.csv,,,,,,0.733,,,,,,,,2.835,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-02 15:00:00,1.486,,-0.193,,44.63,S,182.8,,33.69,,0.0,,1.114,,344.2,,51.09,,1.515,,1.614,,0.0,,26.69,,3,2013-01-02 15:00:00,metdata_2013.csv,,,,,,0.713,,,,,,,,1.317,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-03 05:00:00,-4.127,,0.836,,72.79,,0.431,,16.15,,0.0,,0.128,,58.43,,26.98,,1.515,,0.155,,0.0,,21.05,,3,2013-01-03 05:00:00,metdata_2013.csv,,,,,,-0.145,,,,,,,,1.105,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-03 07:00:00,-6.634,,0.913,,89.9,,8.16,,9.41,,0.0,,0.056,,189.2,,40.67,,1.515,,0.095,,0.0,,20.23,,3,2013-01-03 07:00:00,metdata_2013.csv,,,,,,0.199,,,,,,,,0.345,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-03 13:00:00,1.94,,-0.521,,46.87,S,347.6,,27.59,,0.0,,2.677,,257.6,,20.74,,1.515,,2.836,,0.0,,23.97,,3,2013-01-03 13:00:00,metdata_2013.csv,,,,,,4.233,,,,,,,,1.913,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-03 15:00:00,3.047,,-0.255,,42.12,S,215.0,,29.64,,0.0,,3.022,,250.3,,13.3,,1.515,,3.119,,0.0,,25.13,,3,2013-01-03 15:00:00,metdata_2013.csv,,,,,,2.183,,,,,,,,1.293,,39.028177,-76.817127,47.0,Range,Flat
BEL116,2013-01-03 17:00:00,-0.086,,1.301,,59.27,,1.11,,15.17,,0.0,,0.183,,171.8,,48.83,,1.515,,0.491,,0.0,,25.02,,3,2013-01-03 17:00:00,metdata_2013.csv,,,,,,0.033,,,,,,,,0.995,,39.028177,-76.817127,47.0,Range,Flat
