### The purpose of this notebook is to cover uploading and examining argo data on Amazon RDS. We start out with a large (4.7 GB, 63 million rows) CSV file from our argo API parsing script.
### Let's examine the first chunk of our ARGO csv.

In [3]:
# Read in the file
import pandas as pd
import numpy as np
df = pd.read_csv('argo_data.csv', nrows=100)
cols = df.columns

In [10]:
# Look at first few rows
df.head()

Unnamed: 0,profile_id,pres,temp,lat,lon,psal,date
0,5904437_131,6.7,8.531,-49.507,-177.884,,2018-06-01T23:57:40.002Z
1,5904437_131,10.1,8.53,-49.507,-177.884,,2018-06-01T23:57:40.002Z
2,5904437_131,20.1,8.531,-49.507,-177.884,,2018-06-01T23:57:40.002Z
3,5904437_131,31.0,8.527,-49.507,-177.884,,2018-06-01T23:57:40.002Z
4,5904437_131,41.0,8.528,-49.507,-177.884,,2018-06-01T23:57:40.002Z


In [11]:
# NOTE: There are NaN values for psal, we will replace them with "NULLS" for now
df['psal'].fillna('NULL', inplace=True)

### Next, let's add this small amount of data to our AWS RDS for practice. First, we will connect to our RDS with psycopg2.
### Useful database details:
#### Master username = argo 
#### Master password = DistributedComputing
#### Database name = msds_argo (underscore, NOT a dash)
#### DB instance identifier = zam (zach, andrew, maxine)
#### (Note: Your IAMs should work for username, password as well)

In [31]:
# From: https://towardsdatascience.com/amazon-rds-step-by-step-guide-14f9f3087d28
# Establish a connection to our free-tier Postgres Amazon RDS
import psycopg2
connection = psycopg2.connect(
    host = 'zam.cnynl0ibutvj.us-west-2.rds.amazonaws.com',
    port = '5432',
    user = 'argo',
    password = 'DistributedComputing',
    database='msds_argo'
    )
cursor=connection.cursor()


The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.



### Next, we will create a flat table to upload our CSV data to. I will comment out the creating table code so we don't accidentally run it again when we don't need to (since the table already exists in RDS).

In [123]:
# Show datatypes of our CSV
df.dtypes

profile_id     object
pres          float64
temp          float64
lat           float64
lon           float64
psal           object
date           object
dtype: object

In [126]:
# Delete the table
# connection.rollback()
# cursor.execute('DROP TABLE "argo";')  
# connection.commit()

In [127]:
# # Create empty argo table
# cursor.execute("""CREATE TABLE argo(
# profile_id text,
# pres float,
# temp float,
# lat float,
# lon float,
# psal float,
# date text)""")

In [128]:
# Commit the table creation
# connection.commit()

In [4]:
# Verify our table creation with pandas
sql = """
SELECT "table_name","column_name", "data_type", "table_schema"
FROM INFORMATION_SCHEMA.COLUMNS
WHERE "table_schema" = 'public'
ORDER BY table_name  
"""
pd.read_sql(sql, con=connection)

Unnamed: 0,table_name,column_name,data_type,table_schema
0,argo,pres,double precision,public
1,argo,lon,double precision,public
2,argo,psal,double precision,public
3,argo,profile_id,text,public
4,argo,lat,double precision,public
5,argo,date,text,public
6,argo,temp,double precision,public


### Next, let's add the CSV data to our table. For now, I will just practice with adding 100 rows. I will comment out the adding data code so we don't accidentally run it again when we don't need to.

In [130]:
# Write our small dataframe to CSV and upload that to Postgres RDS
df = df.head()
df.to_csv('small.csv', index=None)

# Double check the data looks like how we want it to
small = pd.read_csv('small.csv', keep_default_na=False)

In [131]:
small

Unnamed: 0,profile_id,pres,temp,lat,lon,psal,date
0,5904437_131,6.7,8.531,-49.507,-177.884,,2018-06-01T23:57:40.002Z
1,5904437_131,10.1,8.53,-49.507,-177.884,,2018-06-01T23:57:40.002Z
2,5904437_131,20.1,8.531,-49.507,-177.884,,2018-06-01T23:57:40.002Z
3,5904437_131,31.0,8.527,-49.507,-177.884,,2018-06-01T23:57:40.002Z
4,5904437_131,41.0,8.528,-49.507,-177.884,,2018-06-01T23:57:40.002Z


In [112]:
# Copy the test CSV to get ready for upload to RDS
# with open('small_data.csv', 'r') as row:
#     next(row)  # Skip the header row.
#     cursor.copy_from(row, 'argo', sep=',', null='NULL')

In [113]:
# Commit the data upload
# connection.commit()  

### Let's perform a SQL query to verify our data was uploaded.

In [136]:
# Create a query
query = """
SELECT AVG(temp)
FROM argo
"""
# Read query into pandas dataframe
pd.read_sql(query, con=connection)

Unnamed: 0,avg
0,


In [133]:
# Verify the value matches the dataframe analogue
small['temp'].mean()

8.529399999999999

### Now, let's upload the full dataset to the RDS. Again, I will comment out the adding data code so we don't accidentally run it again when we don't need to.

In [135]:
# Delete the table (if needed)
# connection.rollback()
# cursor.execute('DROP TABLE "argo";')  
# connection.commit()

In [138]:
# Copy the full CSV to get ready for upload to RDS
# Note - This takes a while (an hour!!!) to run
# with open('argo_data.csv', 'r') as row:
#     next(row)  # Skip the header row.
#     cursor.copy_from(row, 'argo', sep=',', null='NULL')

In [139]:
# Commit the data upload
# connection.commit()  

### Let's make sure the full dataset was uploaded succesfully.

In [140]:
# Create a query
query = """
SELECT COUNT(DISTINCT(profile_id))
FROM argo
"""
# Read query into pandas dataframe
pd.read_sql(query, con=connection)

Unnamed: 0,count
0,103637


In [32]:
# Create a query
query = """
SELECT COUNT(*)
FROM argo
"""
# Read query into pandas dataframe
pd.read_sql(query, con=connection)

Unnamed: 0,count
0,63423133


### Next, let's get JDBC set up. This should allow us to make a spark dataframe so we can convert the dataset to a RDD for faster data manipulation.

### Download the JDBC driver: https://jdbc.postgresql.org/download.html
### Follow the steps from: https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql
### export SPARK_CLASSPATH=/path/to/downloaded/jar (for me the path is just to my downloads folder)
### Follow the steps for python code from: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
### The following code should be run in a EMR Pyspark shell!

In [None]:
# %%configure -f
# {"jars": ["s3://argojarfile/jdbc-postgresql.jar"]}

# from pyspark import SparkContext
# from pyspark.sql import SQLContext
# from pyspark.sql import SparkSession
# from pyspark.sql import DataFrameReader

# sc = SparkContext.getOrCreate()
# sqlContext = SQLContext(sc)

# url = 'postgresql://zam.cnynl0ibutvj.us-west-2.rds.amazonaws.com:5432/msds_argo'

# properties = {"user":"argo", "password":"DistributedComputing", "driver":"org.postgresql.Driver"}

# df = DataFrameReader(sqlContext).jdbc(url = 'jdbc:%s' %url, table='argo', properties=properties)
# dff.count()
# rdd = df.rdd

### Now, we will do some data visualizations on summary statistics we had previously queried.

In [1]:
import plotly
import plotly.graph_objects as go

In [4]:
# Read in the file
df = pd.read_csv('argo_results.csv')

In [5]:
df.head()

Unnamed: 0,column_name,average,min,max,stddev
0,pressure,886.098342,-2.0,5999.6,603.191132
1,temperature,6.627487,-2.164,31.863,5.875163
2,psal,34.553471,15.192,36.802,0.373934


In [38]:
df2 = df.loc[df.column_name != 'pressure']
fig = go.Figure(data=[
    go.Bar(name='min', x=df2.column_name, y=df2['min']),
    go.Bar(name='max', x=df2.column_name, y=df2['max']),
])
fig.update_layout(barmode='group')
fig.show()
plotly.offline.plot(fig, filename = '1.html', auto_open=False)

'1.html'

In [39]:
df2 = df.loc[df.column_name != 'pressure']
fig = go.Figure(data=[
    go.Bar(name='average', x=df2.column_name, y=df2['average']),
    go.Bar(name='standard deviation', x=df2.column_name, y=df2['stddev'])
])
fig.update_layout(barmode='group')
fig.show()
plotly.offline.plot(fig, filename = '2.html', auto_open=False)

'2.html'

In [40]:
import plotly.express as px
fig = px.bar(x=['Three', 'Five'], y=[163.7, 120.6])
fig.show()
plotly.offline.plot(fig, filename = '3.html', auto_open=False)

'3.html'