### Local


In [2]:
import numpy as np
import pandas as pd
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [6]:
# load pickle file US_1000.pkl
df = pd.read_pickle('US_1000.pkl')
print(len(df))

72993


In [11]:
# get unique parameter
df.parameter.unique()

array(['pm10', 'pm25', '-68.016195'], dtype=object)

### SDSC

In [1]:
# Importing libraries
import numpy as np 
import pandas as pd 
import os
import sys
from pyspark.sql import SparkSession
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession


# --- Creating a spark session ---
# SDSC limits: 128 executors, 250 GB memory

# SET: use to the values used to start this SDSC Jupyter session
node_memory = 240
node_cores = 80

# SET: how many cores you want allocated to the driver node
driver_cores = 10

executor_cores = node_cores - driver_cores
executor_memory = node_memory // executor_cores
driver_memory = node_memory - (executor_cores * executor_memory)

spark = SparkSession.builder \
    .config("spark.driver.memory", f'{driver_memory}g') \
    .config("spark.executor.memory", f'{executor_memory}g')\
    .config('spark.executor.instances', executor_cores) \
    .getOrCreate()

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DecimalType
# Loading the parsed pickled RDD
pickledRDD = spark.sparkContext.pickleFile("/expanse/lustre/projects/uci150/ameek1/df_pkl_parsed")
schema = StructType([
    StructField("location_id", IntegerType(), True),
    StructField("sensors_id", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("datetime", TimestampType(), True),
    StructField("lat", DecimalType(precision=10, scale=6), True),
    StructField("lon", DecimalType(precision=10, scale=6), True),
    StructField("parameter", StringType(), True),
    StructField("units", StringType(), True),
    StructField("value", DecimalType(precision=10, scale=6), True),
    StructField("date", StringType(), True),  
    StructField("time", StringType(), True)   

])
# Converting the RDD to a DataFrame
df = spark.createDataFrame(pickledRDD, schema)

In [3]:
df.columns

['location_id',
 'sensors_id',
 'location',
 'datetime',
 'lat',
 'lon',
 'parameter',
 'units',
 'value',
 'date',
 'time']

In [4]:
import pyspark.sql.functions as F

In [6]:
# group by station, and count the number of distinct parameters (which are the number of pollutants recorded)
# filter to only include stations with at least 5

grouped_df = df.groupBy('location_id').agg(F.countDistinct('parameter').alias('pollutant_count'))
filtered_df = grouped_df.filter(F.col('pollutant_count') >= 5)

In [7]:
filtered_df.count()

36

In [11]:
z = filtered_df.

In [13]:
z

[Row(location_id=1265, pollutant_count=6),
 Row(location_id=1533, pollutant_count=7),
 Row(location_id=1226, pollutant_count=6),
 Row(location_id=1496, pollutant_count=5),
 Row(location_id=1122, pollutant_count=5),
 Row(location_id=1113, pollutant_count=7),
 Row(location_id=1091, pollutant_count=5),
 Row(location_id=1142, pollutant_count=6),
 Row(location_id=1236, pollutant_count=6),
 Row(location_id=1297, pollutant_count=8),
 Row(location_id=1289, pollutant_count=7),
 Row(location_id=1285, pollutant_count=6),
 Row(location_id=1064, pollutant_count=8),
 Row(location_id=1181, pollutant_count=5),
 Row(location_id=1540, pollutant_count=8),
 Row(location_id=1305, pollutant_count=6),
 Row(location_id=1200, pollutant_count=7),
 Row(location_id=1120, pollutant_count=6),
 Row(location_id=1311, pollutant_count=6),
 Row(location_id=1069, pollutant_count=7),
 Row(location_id=1518, pollutant_count=6),
 Row(location_id=10363, pollutant_count=5),
 Row(location_id=1375, pollutant_count=5),
 Row(locat