In [16]:
import requests
import pandas as pd
import matplotlib.pyplot as plt
import geopandas as gpd
import contextily as cx
import json
from pyproj import Transformer

In [49]:
import boto3
from datetime import datetime
from secrets import access_key, secret_access_key, api_key

## Collection

In [45]:
def api_connection(tide, format, lat, lon, tp, api_key):
    base_url = 'http://api.worldweatheronline.com'
    
    api_values = {'tide': tide,
                  'format': format,
                  'lat': lat,
                  'lon': lon,
                  'tp': tp,
                  'api_key': api_key}
    
    api = '/premium/v1/marine.ashx?key={api_key}&format={format}&q={lat},{lon}&tide={tide}&tp={tp}'\
    .format(api_key = api_values['api_key'],\
            format = api_values['format'],\
            lat = api_values['lat'],\
            lon = api_values['lon'],\
            tide = api_values['tide'],\
            tp = api_values['tp'])
    
    url = base_url + api
    response = requests.get(url)
    json_text = response.json()
    
    json_string = json.dumps(json_text)
    
    return(json_string)

In [36]:
def s3_load(bucket, access_key, secret_access_key, json_string):
    utcs_now = datetime.utcnow().strftime('%Y%m%d_%H%M')
    bucket = bucket
    client = boto3.client('s3',
                     aws_access_key_id = access_key,
                     aws_secret_access_key = secret_access_key)
    key = 'not_processed/surf_data_{}.json'.format(utcs_now)
    client.put_object(Body = json_string, 
                  Bucket = bucket, 
                  Key = key)

In [46]:
json_string = api_connection('yes', 'json', -0.763184, -90.332251, 1, api_key)
s3_load('galapagos-surf-eu-west-2', access_key, secret_access_key, json_string)

## Processing files

In [227]:
def list_not_processed_files(bucket, prefix, access_key, secret_access_key):
    """
    Reading json files in bucket
    """
    s3 = boto3.client('s3',
                aws_access_key_id = access_key,
                aws_secret_access_key = secret_access_key)
    
    bucket = bucket
    prefix = prefix
    files = []

    for key in s3.list_objects(Bucket=bucket, Prefix = prefix)['Contents']:
        files.append(key['Key'])
    
    return files

In [228]:
not_processed = list_not_processed_files('galapagos-surf-eu-west-2', 'not_processed', access_key, secret_access_key)

In [229]:
not_processed

['not_processed/surf_data_20220610_2237.json',
 'not_processed/surf_data_20220610_2247.json',
 'not_processed/surf_data_20220611_1638.json',
 'not_processed/surf_data_20220612_1541.json']

In [60]:
s3 = boto3.client('s3',
                    aws_access_key_id = access_key ,
                    aws_secret_access_key = secret_access_key)

response = s3.get_object(Bucket='galapagos-surf-eu-west-2', Key='not_processed/surf_data_20220610_2237.json')
contentBody = response.get("Body").read().decode('utf-8')

In [230]:
surf_json = json.loads(contentBody)

In [373]:
content_object = s3.Object(bucket, 'not_processed/surf_data_20220610_2237.json')

In [374]:
file_content = content_object.get()['Body'].read().decode('utf-8')
json_content = json.loads(file_content)

In [376]:
def surf_file_to_df(bucket, key):
    response = s3.Object(bucket, key)
    contentBody = response.get()['Body'].read().decode('utf-8')
    surf_json = json.loads(contentBody)
    df = pd.json_normalize(surf_json['data']['weather'], ['hourly'], ['date'])
    df_upload = df[['date','time', 'tempC', 'tempF', 'windspeedMiles', 'windspeedKmph',
               'winddirDegree', 'winddir16Point', 'weatherCode', 'visibility',
               'visibilityMiles', 'sigHeight_m', 'swellHeight_m', 'swellHeight_ft',
               'swellDir', 'swellDir16Point', 'swellPeriod_secs', 'waterTemp_C',
               'waterTemp_F']]
    return(df_upload)

In [377]:
surf_file_to_df(bucket, 'not_processed/surf_data_20220610_2237.json')

Unnamed: 0,date,time,tempC,tempF,windspeedMiles,windspeedKmph,winddirDegree,winddir16Point,weatherCode,visibility,visibilityMiles,sigHeight_m,swellHeight_m,swellHeight_ft,swellDir,swellDir16Point,swellPeriod_secs,waterTemp_C,waterTemp_F
0,2022-06-10,0,22,71,13,21,157,SSE,116,10,6,0.5,1.1,3.6,140,SE,9.5,21,70
1,2022-06-10,100,22,71,12,19,161,SSE,116,10,6,0.5,1.1,3.6,140,SE,9.4,23,73
2,2022-06-10,200,22,71,11,17,165,SSE,116,10,6,0.4,1.1,3.6,140,SE,9.3,23,73
3,2022-06-10,300,22,71,10,15,169,SSE,116,10,6,0.4,1.1,3.6,140,SE,9.2,21,70
4,2022-06-10,400,22,72,10,16,166,SSE,116,10,6,0.4,1.1,3.6,140,SE,9.2,23,73
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
163,2022-06-16,1900,22,71,15,24,164,SSE,113,10,6,0.6,1.1,3.6,240,WSW,12.3,23,73
164,2022-06-16,2000,22,71,15,24,165,SSE,113,10,6,0.6,1.1,3.6,200,SSW,12.2,23,73
165,2022-06-16,2100,22,71,15,24,165,SSE,116,10,6,0.6,1.1,3.6,160,SSE,12.2,21,70
166,2022-06-16,2200,22,71,15,24,164,SSE,116,10,6,0.6,1.1,3.6,200,SSW,12.2,23,73


In [232]:
df = pd.json_normalize(surf_json['data']['weather'], ['hourly'], ['date'])

In [254]:
df_upload = df[['date','time', 'tempC', 'tempF', 'windspeedMiles', 'windspeedKmph',
               'winddirDegree', 'winddir16Point', 'weatherCode', 'visibility',
               'visibilityMiles', 'sigHeight_m', 'swellHeight_m', 'swellHeight_ft',
               'swellDir', 'swellDir16Point', 'swellPeriod_secs', 'waterTemp_C',
               'waterTemp_F']]

In [330]:
import pandas as pd
from sqlalchemy import create_engine, text

In [331]:
username = *
password_db = *
host = *
port = *
dbname = *

engine = create_engine('postgresql://{username}:{password_db}@{host}:{port}/{dbname}'.format(username = username,\
                                                                                            password_db = password_db,\
                                                                                            host = host,\
                                                                                            port = port,\
                                                                                            dbname = dbname))

In [335]:
df_upload.to_sql('temp_raw', con=engine)

In [336]:
query1 = """
INSERT INTO galapagos.tortuga_bay_raw (date, time, tempc, tempf, windspeedmiles, windspeedkmph,
winddirdegree, winddir16point, weathercode, visibilitykm, visibilitymiles, sigheight_m, swellheight_m,\
swellheight_ft, swelldir, swelldir16point, swellperiods_secs, watertempc, watertempf) \
SELECT "date"::date, "time"::int, "tempC"::int, "tempF"::int, "windspeedMiles"::int, "windspeedKmph"::int, \
"winddirDegree"::int, "winddir16Point", "weatherCode"::int, "visibility"::int, "visibilityMiles"::int, \
"sigHeight_m"::float, "swellHeight_m"::float, "swellHeight_ft"::float,"swellDir"::int,\
"swellDir16Point", "swellPeriod_secs"::float, "waterTemp_C"::int, "waterTemp_F"::int from public.temp_raw

ON CONFLICT (date, time) DO \

UPDATE SET tempc = EXCLUDED.tempC, tempf = EXCLUDED.tempF, windspeedmiles = EXCLUDED.windspeedMiles, \
windspeedkmph = EXCLUDED.windspeedKmph, winddirdegree = EXCLUDED.winddirDegree, \
winddir16point = EXCLUDED.winddir16Point, weathercode = EXCLUDED.weatherCode, visibilitykm = EXCLUDED.visibilitykm, \
visibilitymiles = EXCLUDED.visibilityMiles, sigheight_m = EXCLUDED.sigHeight_m, swellheight_m = EXCLUDED.swellHeight_m, \
swellheight_ft = EXCLUDED.swellHeight_ft, swelldir = EXCLUDED.swellDir, swelldir16point = EXCLUDED.swellDir16Point, \
swellperiods_secs = EXCLUDED.swellperiods_secs, watertempc = EXCLUDED.watertempc, watertempf = EXCLUDED.watertempf;
"""

engine.execute(query1)

<sqlalchemy.engine.result.ResultProxy at 0x7f79d753d490>

In [337]:
engine.execute(query2)

<sqlalchemy.engine.result.ResultProxy at 0x7f79d6d81160>

In [None]:
def moving_s3_file(bucket, old_prefix, new_prefix, file_name):
    
    old_name = "{0}/{1}/{2}".format(bucket, old_prefix, file_name)
    
    old_key = "{0}/{1}".format(old_prefix, file_name)
    new_key = "{0}/{1}".format(new_prefix, file_name)
    
    s3.Object(bucket, new_key).copy_from(CopySource=old_name)
    s3.Object(bucket,'old_file_key').delete()

In [342]:
file = 'not_processed/surf_data_20220610_2237.json'
file_name = file.split('/')[-1]

In [343]:
file_name

'surf_data_20220610_2237.json'

In [349]:
bucket = 'galapagos-surf-eu-west-2'
old_prefix = 'not_processed'
new_prefix = 'processed'

In [350]:
old_name = "{0}/{1}/{2}".format(bucket, old_prefix, file_name)

new_name = "{0}/{1}/{2}".format(bucket, new_prefix, file_name)
new_key = "{0}/{1}".format(new_prefix, file_name)

In [351]:
new_key

'processed/surf_data_20220610_2237.json'

In [360]:
s3.Object(bucket, new_key).copy_from(CopySource=old_name)

{'ResponseMetadata': {'RequestId': 'WBG0Y99BQZQKCWF0',
  'HostId': 'utvV5rO3N1wq97XXI6jVYwU6Y6gcnorfy8v9iSHXMjO0ByZx2aZxnoW4WC7MQKrUPUmxH1JO8i8=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'utvV5rO3N1wq97XXI6jVYwU6Y6gcnorfy8v9iSHXMjO0ByZx2aZxnoW4WC7MQKrUPUmxH1JO8i8=',
   'x-amz-request-id': 'WBG0Y99BQZQKCWF0',
   'date': 'Wed, 15 Jun 2022 22:35:11 GMT',
   'content-type': 'application/xml',
   'server': 'AmazonS3',
   'content-length': '234'},
  'RetryAttempts': 1},
 'CopyObjectResult': {'ETag': '"c032a8064782db96ac467cd551588345"',
  'LastModified': datetime.datetime(2022, 6, 15, 22, 35, 11, tzinfo=tzutc())}}

In [None]:
s3.Object('my_bucket','old_file_key').delete()

In [359]:
s3 = boto3.resource('s3',
                    aws_access_key_id = access_key,
                    aws_secret_access_key = secret_access_key

In [368]:
not_processed = list_not_processed_files('galapagos-surf-eu-west-2', 'not_processed', access_key, secret_access_key)

In [369]:
not_processed

['not_processed/surf_data_20220610_2237.json',
 'not_processed/surf_data_20220610_2247.json',
 'not_processed/surf_data_20220611_1638.json',
 'not_processed/surf_data_20220612_1541.json',
 'not_processed/surf_data_20220613_2126.json',
 'not_processed/surf_data_20220614_1928.json',
 'not_processed/surf_data_20220615_2143.json']