In [19]:
import datetime
import pandas as pd
import numpy as np
import json
from sqlalchemy import text
import io
import sys
import requests

from airflow.providers.postgres.hooks.postgres import PostgresHook


In [20]:
topic = "siextent"
dataset = "nsidc_g02135"
tbl_name = f"t_{topic}_{dataset}"

db_conn = PostgresHook(postgres_conn_id = "arcticdata_pgsql")

# Download latest data from NSIDC

SOURCE_URL = "http://masie_web.apps.nsidc.org/pub/DATASETS/NOAA/G02135/north/daily/data/N_seaice_extent_daily_v3.0.csv"


In [21]:
num_records_before= db_conn.get_first(sql=f"select count(*) from {tbl_name}")[0]
print(num_records_before)

max_date= db_conn.get_first(f"SELECT MAX(datetime_date) FROM {tbl_name}")[0]
print(max_date)


[[34m2022-04-11 21:15:09,747[0m] {[34mbase.py:[0m79} INFO[0m - Using connection to: id: arcticdata_pgsql. Host: postgres, Port: 5432, Schema: arcticdata, Login: airflow, Password: airflow, extra: {}[0m
14214
[[34m2022-04-11 21:15:09,762[0m] {[34mbase.py:[0m79} INFO[0m - Using connection to: id: arcticdata_pgsql. Host: postgres, Port: 5432, Schema: arcticdata, Login: airflow, Password: airflow, extra: {}[0m
2022-04-04 00:00:00


In [22]:
r= requests.get(SOURCE_URL)


if r.ok:

  df = pd.read_csv(io.StringIO(r.text), skiprows=[1], skipinitialspace=True)


  df = df.rename(columns= lambda x: x.lower().strip().replace(" ", "_"),)

  df["datetime_date"] = pd.to_datetime(df[["year","month","day"]])
      
  del df["year"]
  del df["month"]
  del df["day"]

  df = df[df["datetime_date"]>max_date]
  new_records = len(df)
  df.to_sql(tbl_name,con = db_conn.get_sqlalchemy_engine(), if_exists="append",index=False)
    
  num_records_after= db_conn.get_first(sql=f"select count(*) from {tbl_name}")[0]
  print(num_records_after)

  status = "OK"
else:
  new_records = 0
  num_records_after = num_records_before
  status = "NOK"

[[34m2022-04-11 21:15:36,202[0m] {[34mbase.py:[0m79} INFO[0m - Using connection to: id: arcticdata_pgsql. Host: postgres, Port: 5432, Schema: arcticdata, Login: airflow, Password: airflow, extra: {}[0m
[[34m2022-04-11 21:15:36,251[0m] {[34mbase.py:[0m79} INFO[0m - Using connection to: id: arcticdata_pgsql. Host: postgres, Port: 5432, Schema: arcticdata, Login: airflow, Password: airflow, extra: {}[0m
14220


In [8]:
df.to_sql(name=tbl_name,con=db_conn.get_sqlalchemy_engine(),index=False, if_exists="append")

[[34m2022-04-11 15:30:12,309[0m] {[34mbase.py:[0m79} INFO[0m - Using connection to: id: arcticdata_pgsql. Host: postgres, Port: 5432, Schema: arcticdata, Login: airflow, Password: airflow, extra: {}[0m
