# Wildfire Risk - Ingest
__Team 3 - Dave Friesen, John Chen, and Kyle Dalope__<br>
__ADS-508-02-SP23__<br><br>
__GitHub link: https://github.com/davefriesen/wildfire-risk__

In [2]:
__authors__ = ['Dave Friesen', 'John Chen', 'Kyle Dalope']
__contact__ = ['dfriesen@sandiego.edu', 'johnchen@sandiego.edu', 'kdalope@sandiego.edu']
__date__ = '2023-03-20'
__license__ = 'MIT'
__version__ = '1.0.1'

# Setup Basics

In [3]:
# Import basic libraries
import boto3
import sagemaker

# Import data access libraries
import pandas as pd
from profiler import profile, profile_cat
!pip install --disable-pip-version-check -q PyAthena==2.1.0
from pyathena import connect

# Import utility libraries
from IPython.core.display import display, HTML

[0m

In [4]:
# Establish session fundamentals
sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
account_id = boto3.Session().client(service_name='sagemaker', region_name=region)

# Setup S3

In [5]:
# Set public path and store as SageMaker variable
s3_public_path = 's3://wildfire-risk/'
%store s3_public_path

# Set private path ("my bucket")
s3_private_path = 's3://{}/widfire-risk/data/'.format(bucket)
%store s3_private_path
print(s3_private_path)

Stored 's3_public_path' (str)
Stored 's3_private_path' (str)
s3://sagemaker-us-east-1-857283526476/widfire-risk/data/


In [6]:
# Get contents of Public S3 bucket (file source)
!aws s3 ls $s3_public_path

2023-03-24 02:49:22    8623294 conditions.csv
2023-03-18 23:31:25   10821874 fires.csv
2023-03-18 23:31:21   38317925 weather.csv


In [7]:
# Now copy public bucket-based data to private (local) bucket
!aws s3 cp --recursive $s3_public_path $s3_private_path'fires'/ --exclude '*' --include 'fires.csv'
!aws s3 cp --recursive $s3_public_path $s3_private_path'weather'/ --exclude '*' --include 'weather.csv'

copy: s3://wildfire-risk/fires.csv to s3://sagemaker-us-east-1-857283526476/widfire-risk/data/fires/fires.csv
copy: s3://wildfire-risk/weather.csv to s3://sagemaker-us-east-1-857283526476/widfire-risk/data/weather/weather.csv


In [8]:
# List contents of private S3 bucket to confirm copy
!aws s3 ls $s3_private_path'fires'/
!aws s3 ls $s3_private_path'weather'/

2023-03-24 18:46:04   10821874 fires.csv
2023-03-24 18:46:06   38317925 weather.csv


# Load and Validate Data (*traditional Pandas - as check*)

In [None]:
# Load and check base data files from public S3 bucket
fires_df = pd.read_csv(s3_private_path+'fires/fires.csv')
weather_df = pd.read_csv(s3_private_path+'weather/weather.csv')
profile(fires_df)
profile(weather_df)

Unnamed: 0,Dtype,count,unique,na,na%,mean,std,min,max,skew(>=3),<v0.01,VIF(>=10),examples
ContainmentDateTime,object,35315,33108,53847.0,60.4,,,,,,,,nan__2014/08/06 03:0
ControlDateTime,object,32570,30401,56592.0,63.5,,,,,,,,nan__2014/08/07 00:4
DiscoveryAcres,float64,72195,348,16967.0,19.0,5.8,500.0,,115997.0,191.8,,,0.1__nan__0.1__nan__
EstimatedCostToDate,float64,2128,1040,87034.0,97.6,8193477.0,34440012.7,,800000000.0,13.1,,,nan__nan__nan__nan__
FinalAcres,float64,2631,345,86531.0,97.0,14.9,318.1,,13440.0,34.3,,,nan__nan__nan__nan__
FireBehaviorGeneral,object,1498,4,87664.0,98.3,,,,,,,,nan__nan__nan__nan__
FireBehaviorGeneral1,object,1223,16,87939.0,98.6,,,,,,,,nan__nan__nan__nan__
FireBehaviorGeneral2,object,1164,16,87998.0,98.7,,,,,,,,nan__nan__nan__nan__
FireBehaviorGeneral3,object,910,16,88252.0,99.0,,,,,,,,nan__nan__nan__nan__
FireCause,object,77967,4,11195.0,12.6,,,,,,,,Undetermined__Natura


# Setup Athena Database

In [None]:
# Set Athena database name
database_name = 'dsoaws'

# Set S3 staging directory (temp directory for Athena queries)
s3_staging_dir = 's3://{0}/athena/staging'.format(bucket)

# Establish S3 connection
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

# Create Athena database
statement = 'CREATE DATABASE IF NOT EXISTS {}'.format(database_name)
pd.read_sql(statement, conn)

# Verify database creation
statement = 'SHOW DATABASES'
df_show = pd.read_sql(statement, conn)

if database_name in df_show.values:
    ingest_create_athena_db_passed = True
%store ingest_create_athena_db_passed    

df_show.head(5)

## Register fires.csv as Athena table

In [None]:
fires_csv = 'fires.csv'
fires_tb = 'fires'

# Force table creation
statement = """DROP TABLE IF EXISTS {}.{}""".format(
    database_name, fires_tb
)    
pd.read_sql(statement, conn)

# Create table
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
    ContainmentDateTime string,
    ControlDateTime string,
    DiscoveryAcres double,
    EstimatedCostToDate double,
    FinalAcres string,
    FireBehaviorGeneral string,
    FireBehaviorGeneral1 string,
    FireBehaviorGeneral2 string,
    FireBehaviorGeneral3 string,
    FireCause string,
    FireCauseGeneral string,
    FireCauseSpecific string,
    FireDiscoveryDateTime string,
    FireOutDateTime string,
    GACC string,
    IncidentName string,
    IncidentShortDescription string,
    InitialLatitude double,
    InitialLongitude double,
    IsFireCauseInvestigated double,
    IsTrespass double,
    POOCity string,
    POOState string,
    PredominantFuelModel string,
    PrimaryFuelModel string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('skip.header.line.count'='1')""".format(
    database_name, fires_tb, s3_private_path+fires_tb+'/'
)
print(statement)

pd.read_sql(statement, conn)

## Register weather.csv as Athena table

In [None]:
weather_csv = 'weather.csv'
weather_tb = 'weather'

# Force table creation
statement = """DROP TABLE IF EXISTS {}.{}""".format(
    database_name, weather_tb
)    
pd.read_sql(statement, conn)

# Create table
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
    Station string,
    WDate string,
    Latitude double,
    Longitude double,
    Elevation double,
    Name string,
    CDSD double,
    CDSD_attributes string,
    CLDD double,
    CLDD_attributes string,
    DT00 double,
    DT00_attributes string,
    DT32 double,
    DT32_attributes string,
    DX32 double,
    DX32_attributes string,
    DX70 double,
    DX70_attributes string,
    DX90 double,
    DX90_attributes string,
    EMNT double,
    EMNT_attributes string,
    EMXT double,
    EMXT_attributes string,
    HDSD double,
    HDSD_attributes string,
    HTDD double,
    HTDD_attributes string,
    TAVG double,
    TAVG_attributes string,
    TMAX double,
    TMAX_attributes string,
    TMIN double,
    TMIN_attributes string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('skip.header.line.count'='1')""".format(
    database_name, weather_tb, s3_private_path+weather_tb+'/'
)
print(statement)

pd.read_sql(statement, conn)

## Verify table creation

In [None]:
statement = "SHOW TABLES in {}".format(database_name)
df_show = pd.read_sql(statement, conn)

if [fires_tb, weather_tb] in df_show.values:
    ingest_create_athena_table_passed = True
%store ingest_create_athena_table_passed

df_show.head(5)

## Run sample queries

In [None]:
statement = """SELECT * FROM {}.{}
    WHERE FireCause = 'Natural' LIMIT 50""".format(
    database_name, fires_tb
)
print(statement)

df = pd.read_sql(statement, conn)
df.head(5)

In [None]:
statement = """SELECT * FROM {}.{} LIMIT 10""".format(
    database_name, weather_tb
)
print(statement)

df = pd.read_sql(statement, conn)
df.head(5)

# Setup Parquet

In [None]:
# Set Parquet basics
s3_parquet_dir = 's3://{0}/parquet'.format(bucket)

## Create Parquet file from fires.csv

In [None]:
fires_tb_parquet = 'fires_parquet'

# Force table creation
statement = """DROP TABLE IF EXISTS {}.{}""".format(
    database_name, fires_tb_parquet
)    
pd.read_sql(statement, conn)

# SKIP PARTITIONING INITIALLY: WITH (format = 'PARQUET', external_location = '{}', partitioned_by = ARRAY['firecause']) AS
# Create table
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}') AS
SELECT
    ContainmentDateTime,
    ControlDateTime,
    DiscoveryAcres,
    EstimatedCostToDate,
    FinalAcres,
    FireBehaviorGeneral,
    FireBehaviorGeneral1,
    FireBehaviorGeneral2,
    FireBehaviorGeneral3,
    FireCause,
    FireCauseGeneral,
    FireCauseSpecific,
    FireDiscoveryDateTime,
    FireOutDateTime,
    GACC,
    IncidentName,
    IncidentShortDescription,
    InitialLatitude,
    InitialLongitude,
    IsFireCauseInvestigated,
    IsTrespass,
    POOCity,
    POOState,
    PredominantFuelModel,
    PrimaryFuelModel
FROM {}.{}""".format(
    database_name, fires_tb_parquet, s3_private_path+fires_tb_parquet+'/', database_name, fires_tb
)
print(statement)

pd.read_sql(statement, conn)

In [None]:
# Load partitions
statement = "MSCK REPAIR TABLE {}.{}".format(database_name, fires_tb_parquet)
print(statement)

df = pd.read_sql(statement, conn)
df.head(5)

In [None]:
# Show partitions
statement = "SHOW PARTITIONS {}.{}".format(database_name, fires_tb_parquet)
print(statement)

#df_partitions = pd.read_sql(statement, conn)
#df_partitions.head(5)

In [None]:
# Show tables
statement = "SHOW TABLES in {}".format(database_name)
df_tables = pd.read_sql(statement, conn)

if fires_tb_parquet in df_tables.values:
    ingest_create_athena_table_parquet_passed = True
%store ingest_create_athena_table_parquet_passed

df_tables.head(5)

In [None]:
# Run sample query
statement = """SELECT * FROM {}.{}
    WHERE FireCause = 'Natural' LIMIT 50""".format(
    database_name, fires_tb_parquet
)
print(statement)

df = pd.read_sql(statement, conn)
df.head(5)

# Store Variables and Close Session

In [None]:
# Store variables for subsequent notebooks
%store

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [None]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}