In [38]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version

spark_version = 'spark-3.2.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]


In [39]:
#  get the postgres driver
!wget https://jdbc.postgresql.org/download/postgresql-42.3.1.jar

--2022-01-23 18:57:55--  https://jdbc.postgresql.org/download/postgresql-42.3.1.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1015689 (992K) [application/java-archive]
Saving to: ‘postgresql-42.3.1.jar.1’


2022-01-23 18:57:56 (5.57 MB/s) - ‘postgresql-42.3.1.jar.1’ saved [1015689/1015689]



In [40]:
#create a spark  session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("HE Data") \
    .config("spark.driver.extraClassPath", "/content/postgresql-42.3.1.jar") \
    .getOrCreate()

In [81]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
spark.sparkContext.addFile("https://expeditiondata.s3.us-east-2.amazonaws.com/updated_data.csv")
updated_df=spark.read.csv(SparkFiles.get('updated_data.csv'),header=True,  inferSchema=True)
updated_df.show()

+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+----------------+-------+----+-----------+----+-------------+
|expedition_id|peak_id| peak_name|year_id|season|basecamp_date|highpoint_date|termination_date|members|member_deaths|hired_staff|hired_staff_deaths|   member_id|sex|age|highpoint_metres|success|solo|oxygen_used|died|height_metres|
+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+----------------+-------+----+-----------+----+-------------+
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|      10/23/1978|      8|            0|          5|                 0|AMAD78301-01|  M| 40|              NA|      f|   f|          f|   f|         6814|
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/197

In [42]:
#remove highpoint_metres column
# Load in a sql function to use columns
from pyspark.sql.functions import col

In [43]:
dropped_hm_df = updated_df.drop("highpoint_metres")

In [44]:
dropped_hm_df.show()

+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+
|expedition_id|peak_id| peak_name|year_id|season|basecamp_date|highpoint_date|termination_date|members|member_deaths|hired_staff|hired_staff_deaths|   member_id|sex|age|success|solo|oxygen_used|died|height_metres|
+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|      10/23/1978|      8|            0|          5|                 0|AMAD78301-01|  M| 40|      f|   f|          f|   f|         6814|
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|      10/23/1978|      8|            0|          5|               

In [45]:
#create list to filer df to include top 15 most frequented peaks
peak_list=['EVER','CHOY','AMAD','MANA','DHA1',
           'MAKA','LHOT','BARU','PUMO','ANN1','KANG',
           'HIML','ANN4','PUTH','TILI']

In [46]:
#filter df to only included climbers on peaks in top 15
filtered_df = dropped_hm_df.filter(dropped_hm_df.peak_id.isin(peak_list))

In [47]:
#row count of filtered df
filtered_df.count()

61638

In [48]:
#create column showing success by definition (reached summit and survived)
from pyspark.sql.functions import when
success_df = filtered_df.withColumn("defined_success", 
                when(((filtered_df.success == 't') & (filtered_df.died == 'f')),"TRUE") 
               .otherwise("FALSE")) 
success_df.show()

+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+---------------+
|expedition_id|peak_id| peak_name|year_id|season|basecamp_date|highpoint_date|termination_date|members|member_deaths|hired_staff|hired_staff_deaths|   member_id|sex|age|success|solo|oxygen_used|died|height_metres|defined_success|
+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+---------------+
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|      10/23/1978|      8|            0|          5|                 0|AMAD78301-01|  M| 40|      f|   f|          f|   f|         6814|          FALSE|
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|  

In [49]:
#drop duplicates
success_df.dropDuplicates()
success_df.count()

61638

Removing Null Values

In [53]:
# Count the number of null values for each column
Dict_Null = {col:success_df.filter(success_df[col].isNull()).count() for col in success_df.columns}
Dict_Null


{'age': 8,
 'basecamp_date': 0,
 'defined_success': 0,
 'died': 8,
 'expedition_id': 0,
 'height_metres': 0,
 'highpoint_date': 0,
 'hired_staff': 0,
 'hired_staff_deaths': 0,
 'member_deaths': 0,
 'member_id': 8,
 'members': 0,
 'oxygen_used': 8,
 'peak_id': 0,
 'peak_name': 0,
 'season': 0,
 'sex': 8,
 'solo': 8,
 'success': 8,
 'termination_date': 0,
 'year_id': 0}

In [68]:
# drop null columns
# Drop the null columns where all values are null
success_df = success_df.dropna(how='all')

# Drop rows containing NaN values
success_df = success_df.dropna()



In [69]:
# Count the number of null values for each column
Dict_Null = {col:success_df.filter(success_df[col].isNull()).count() for col in success_df.columns}
Dict_Null


{'age': 0,
 'basecamp_date': 0,
 'defined_success': 0,
 'died': 0,
 'expedition_id': 0,
 'height_metres': 0,
 'highpoint_date': 0,
 'hired_staff': 0,
 'hired_staff_deaths': 0,
 'member_deaths': 0,
 'member_id': 0,
 'members': 0,
 'oxygen_used': 0,
 'peak_id': 0,
 'peak_name': 0,
 'season': 0,
 'sex': 0,
 'solo': 0,
 'success': 0,
 'termination_date': 0,
 'year_id': 0}

In [75]:
#row count after nulls dropped
success_df.count()
success_df.show()

+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+---------------+
|expedition_id|peak_id| peak_name|year_id|season|basecamp_date|highpoint_date|termination_date|members|member_deaths|hired_staff|hired_staff_deaths|   member_id|sex|age|success|solo|oxygen_used|died|height_metres|defined_success|
+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+---------------+
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|      10/23/1978|      8|            0|          5|                 0|AMAD78301-01|  M| 40|      f|   f|          f|   f|         6814|          FALSE|
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|  

Removing Outliers

In [76]:
df = success_df

In [89]:
import pandas  as pd
pd.df['age'].plot.box()

AttributeError: ignored

In [78]:
import pandas as pd


import matplotlib.pyplot as plt
from matplotlib import style
#style.use('fivethirtyeight')
import seaborn as sns
import plotly.express as px

cols = ["age"] 

Q3 = success_df[cols].quantile(0.60)
Q1 = success_df[cols].quantile(0.01)
IQR = Q3 - Q1

#print("Upper Limit:", Q3)
#print("Lower Limit:", Q1)
#print("Inter-Quartile Range:", IQR)

#df = df[~((df[cols] < (Q1 - 1.5 * IQR)) |(df[cols] > (Q3 + 1.5 * IQR))).any(axis=1)]
success_df

AttributeError: ignored

In [62]:
defined_clean_df = success_df
defined_clean_df.show()

+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+---------------+
|expedition_id|peak_id| peak_name|year_id|season|basecamp_date|highpoint_date|termination_date|members|member_deaths|hired_staff|hired_staff_deaths|   member_id|sex|age|success|solo|oxygen_used|died|height_metres|defined_success|
+-------------+-------+----------+-------+------+-------------+--------------+----------------+-------+-------------+-----------+------------------+------------+---+---+-------+----+-----------+----+-------------+---------------+
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|      10/23/1978|      8|            0|          5|                 0|AMAD78301-01|  M| 40|      f|   f|          f|   f|         6814|          FALSE|
|    AMAD78301|   AMAD|Ama Dablam|   1978|Autumn|    10/1/1978|    10/20/1978|  

In [63]:
defined_clean_df.printSchema()

root
 |-- expedition_id: string (nullable = true)
 |-- peak_id: string (nullable = true)
 |-- peak_name: string (nullable = true)
 |-- year_id: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- basecamp_date: string (nullable = true)
 |-- highpoint_date: string (nullable = true)
 |-- termination_date: string (nullable = true)
 |-- members: integer (nullable = true)
 |-- member_deaths: integer (nullable = true)
 |-- hired_staff: integer (nullable = true)
 |-- hired_staff_deaths: integer (nullable = true)
 |-- member_id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- success: string (nullable = true)
 |-- solo: string (nullable = true)
 |-- oxygen_used: string (nullable = true)
 |-- died: string (nullable = true)
 |-- height_metres: integer (nullable = true)
 |-- defined_success: string (nullable = false)



In [64]:
# Configure settings for RDS

server_name= "jdbc:postgresql://group-1.c08lganpj8oa.us-east-2.rds.amazonaws.com:5432/postgres"




In [65]:
#write data  frame to active
clean_df.write.jdbc(server_name,'defined_clean',  mode='append',properties={
    "user":'root',
    "password":'Group1!!',
    "driver": 'org.postgresql.Driver'
})