In [1]:
# Read PCA features 
s3_url = 's3a://dse-cohort5-group5/wildfire_capstone/integratedData.pca.parquet.gz'
pca_df = spark.read.parquet(s3_url)
pca_df.createOrReplaceTempView('pca')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1587525179190_0004,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# Read integrated data 
base_df = spark.read.parquet('s3a://dse-cohort5-group5/wildfire_capstone/integratedData.renamed.parquet.gz')
base_df.createOrReplaceTempView("fire_occurrences")

In [3]:
base_df.printSchema()

root
 |-- date: long (nullable = true)
 |-- precipitation_amount_mm: double (nullable = true)
 |-- relative_humidity_%: double (nullable = true)
 |-- specific_humidity_kg/kg: double (nullable = true)
 |-- surface_downwelling_shortwave_flux_in_air_W_m-2: double (nullable = true)
 |-- wind_from_direction_Degrees_Clockwise_from_north: double (nullable = true)
 |-- wind_speed_m/s: double (nullable = true)
 |-- max_air_temperature_K: double (nullable = true)
 |-- min_air_temperature_K: double (nullable = true)
 |-- burning_index_g_Unitless: double (nullable = true)
 |-- dead_fuel_moisture_100hr_Percent: double (nullable = true)
 |-- dead_fuel_moisture_1000hr_Percent: double (nullable = true)
 |-- energy_release_component-g_Unitless: double (nullable = true)
 |-- potential_evapotranspiration_mm: double (nullable = true)
 |-- mean_vapor_pressure_deficit_kPa: double (nullable = true)
 |-- fire_occurred: integer (nullable = true)
 |-- acres_burned: double (nullable = true)
 |-- fire_name: strin

In [4]:
base_df.show(1, vertical=True)

-RECORD 0---------------------------------------------------------------
 date                                             | 915148800000000000  
 precipitation_amount_mm                          | 0.0                 
 relative_humidity_%                              | 40.300000000000004  
 specific_humidity_kg/kg                          | 0.00589             
 surface_downwelling_shortwave_flux_in_air_W_m-2  | 138.0               
 wind_from_direction_Degrees_Clockwise_from_north | 123.0               
 wind_speed_m/s                                   | 1.6                 
 max_air_temperature_K                            | 293.1               
 min_air_temperature_K                            | 281.1               
 burning_index_g_Unitless                         | 24.0                
 dead_fuel_moisture_100hr_Percent                 | 16.0                
 dead_fuel_moisture_1000hr_Percent                | 15.5                
 energy_release_component-g_Unitless              |

In [5]:
pca_df.printSchema()

root
 |-- date: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- pcaFeatures: vector (nullable = true)

In [6]:
pca_df.show(1, vertical=True)

-RECORD 0---------------------------
 date        | 915235200000000000   
 latitude    | 32.608333333333334   
 longitude   | -116.93333330000002  
 pcaFeatures | [-18.953460978093... 
only showing top 1 row

In [7]:
# Join query with fires only
join_query = """
SELECT fire_occurrences.date,          fire_occurrences.latitude,     fire_occurrences.longitude,
       fire_occurrences.fire_occurred, fire_occurrences.acres_burned, pca.pcaFeatures,
       from_unixtime(cast(fire_occurrences.date/1e9 as long), 'yyyy') as year,
       from_unixtime(cast(fire_occurrences.date/1e9 as long), 'MM') as month,
       from_unixtime(cast(fire_occurrences.date/1e9 as long), 'dd') as day
FROM fire_occurrences, pca
WHERE fire_occurrences.fire_occurred =1
  AND pca.date      = fire_occurrences.date
  AND pca.latitude  = fire_occurrences.latitude
  AND pca.longitude = fire_occurrences.longitude
"""

In [8]:
# Get the first 40 PCA (highes variance) with fires only 
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

N_FEATURES_TO_KEEP = 40

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    # Important: asNondeterministic requires Spark 2.3 or later
    # It can be safely removed i.e.
    # return udf(to_array_, ArrayType(DoubleType()))(col)
    # but at the cost of decreased performance
    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)

joined_df = spark.sql(join_query)
joined_df = joined_df.withColumn("pcaFeaturesArr", to_array(col("pcaFeatures")))\
                     .select(["fire_occurrences.date", "fire_occurrences.latitude", "fire_occurrences.longitude",
                              "fire_occurrences.fire_occurred", "fire_occurrences.acres_burned",
                              "year", "month", "day"]
                             + [col("pcaFeaturesArr")[i] for i in range(N_FEATURES_TO_KEEP)])

In [9]:
joined_df.show(3, truncate=False, vertical=True)

-RECORD 0----------------------------------
 date               | 915235200000000000   
 latitude           | 32.81666666666667    
 longitude          | -117.18333330000002  
 fire_occurred      | 0                    
 acres_burned       | null                 
 year               | 1999                 
 month              | 01                   
 day                | 02                   
 pcaFeaturesArr[0]  | -25.166433284344073  
 pcaFeaturesArr[1]  | -2.593375106022652   
 pcaFeaturesArr[2]  | -29.5444903995885    
 pcaFeaturesArr[3]  | 55.593746060079184   
 pcaFeaturesArr[4]  | -35.49629698184918   
 pcaFeaturesArr[5]  | -4.183085518368332   
 pcaFeaturesArr[6]  | 22.070675938301072   
 pcaFeaturesArr[7]  | 10.313034483159997   
 pcaFeaturesArr[8]  | -6.230868715529128   
 pcaFeaturesArr[9]  | -2.4827771351108727  
 pcaFeaturesArr[10] | 13.030193248126603   
 pcaFeaturesArr[11] | 0.30119836171710335  
 pcaFeaturesArr[12] | -0.8366699165031262  
 pcaFeaturesArr[13] | -23.052743

In [10]:
# write first 40 PCA integrated dataset wth fires only in below path with no partition as data is not large 
joined_df.write.parquet(
    's3a://dse-cohort5-group5/wildfire_capstone/integratedData/integratedData.pca40_and_firesonly.parquet.gz',
    mode="overwrite",
    compression='gzip')

An error was encountered:
Invalid status code '400' from http://ip-172-31-21-221.ec2.internal:8998/sessions/3/statements/10 with error payload: {"msg":"requirement failed: Session isn't active."}
