In [3]:
with open("external_commodity_data_ddl.sql", "r") as r:
    print(r.read())

create or replace external table `de-z-camp.de_ag.external_commodity_data`
options (
  format = 'PARQUET',
  uris = ['gs://de_ag_export_bucket_0/commodity_*.parquet']
);


In [1]:

import os
import pandas as pd
import requests
import json
from pathlib import Path
import schema_to_pd_dtype

In [None]:
urls = {
    "commodities": "https://apps.fas.usda.gov/OpenData/api/esr/commodities",
    "units": "https://apps.fas.usda.gov/OpenData/api/esr/unitsOfMeasure",
    "regions": "https://apps.fas.usda.gov/OpenData/api/esr/regions",
    "countries": "https://apps.fas.usda.gov/OpenData/api/esr/countries"
       }

In [None]:
key = os.getenv("usda_api_key")
headers = {"API_KEY": key,
          "Accept": "application/json"}

In [None]:
json_dict = dict()

for k, v in urls.items():
    json_dict[k] = json.loads(requests.get(v, headers=headers).text)

In [None]:
json_dict.keys()

In [None]:
ref_schema[0]

In [None]:

ref = pd.DataFrame(columns=list(ref_schema[0].keys()))
for col in ref.columns:
    if col in ref_schema[1]:
        ref[col] = pd.to_datetime(ref[col], format="%Y-%m-%dT%H:%M:%S")
    else:
        ref[col] = ref[col].astype(ref_schema[0][col])

In [None]:
from schema_to_pd_dtype import schema_get

ref_schema = schema_get(schema=k, file="source_schemas.json")
#ref = pd.read_json(json_response, dtype=ref_schema[0])

ref = pd.DataFrame(columns=list(ref_schema[0].keys()))
for col in ref.columns:
    if col in ref_schema[1]:
        ref[col] = pd.to_datetime(ref[col], format="%Y-%m-%dT%H:%M:%S")
    else:
        ref[col] = ref[col].astype(ref_schema[0][col])
        
ref = pd.concat([ref, pd.DataFrame.from_records(json_dict[k])])

In [None]:
ref

In [None]:
data_release_url = "https://apps.fas.usda.gov/OpenData/api/esr/datareleasedates"
dr = requests.get(data_release_url, headers=headers)

In [None]:
market_url = "https://apps.fas.usda.gov/OpenData/api/esr/exports/commodityCode/801/allCountries/marketYear/2023"

market = requests.get(market_url, headers=headers)

In [None]:
market_json = json.loads(market.content)

In [None]:
for k, v in json_dict.items():
    with open(f"{k}.json", "w") as w:
        w.write(json.dumps(v, indent=4))   

In [None]:
data_release = json.loads(dr.text)

with open("data_release_dates.json", "w") as w:
        w.write(json.dumps(data_release, indent=4))   

In [None]:
drd = pd.read_json("data_release_dates.json")

In [None]:
#drd["releaseTimeStamp"] = drd["releaseTimeStamp"].str.replace("T", " ", regex=False)

date_cols=["marketYearStart", "marketYearEnd", "releaseTimeStamp"]
for col in date_cols:
    drd[col] = pd.to_datetime(drd[col], format="%Y-%m-%dT%H:%M:%S")

In [None]:
drd[["commodityCode", "marketYear"]].drop_duplicates()

In [None]:
from pathlib import Path, PurePosixPath

In [None]:
data_path = Path(__file__).parent.resolve() / "data"

In [None]:
from pathlib import Path, PurePosixPath
data_path = Path(__file__).parent.resolve() / "data"

cc = 101
year = 2022
url = f"https://apps.fas.usda.gov/OpenData/api/esr/exports/commodityCode/{cc}/allCountries/marketYear/{year}"
print(url)
r = requests.get(url, headers=headers)
r_json = json.loads(r.content)
    
with open(f"{cc}_{year}.json", "w") as w:
    w.write(json.dumps(r_json, indent=4)) 

In [None]:
# Specify country
# https://apps.fas.usda.gov/OpenData/api/esr/exports/commodityCode/101/countryCode/1/marketYear/2022

# All countries
# 'https://apps.fas.usda.gov/OpenData/api/esr/exports/commodityCode/101/allCountries/marketYear/2022'

# Try with commodity 101
data_path = Path("C:/Users/daniel/Documents/ag_export_project/de_ag_export/data/")
cc = 101
drd_101 = drd.loc[drd["commodityCode"] == cc]
for year in drd_101["marketYear"].values:
    url = f"https://apps.fas.usda.gov/OpenData/api/esr/exports/commodityCode/{cc}/allCountries/marketYear/{year}"
    print(url)
    r = requests.get(url, headers=headers)
    r_json = json.loads(r.content)
    
    with open(data_path / f"{cc}_{year}.json", "w") as w:
        w.write(json.dumps(r_json, indent=4))   
    break

In [None]:
r_json[0]["dataReleaseDate"] 

In [None]:
drd.to_csv("previous_data_release_dates.csv", index=False)

In [None]:
pd.read_csv("previous_data_release_dates.csv",
           usecols=["commodityCode", "marketYear", "releaseTimeStamp"],
           parse_dates=["releaseTimeStamp"])

In [7]:
class USDAReader():

    def __init__(self):
        USDA_API_KEY = os.getenv("USDA_API_KEY")
        self.headers = {"API_KEY": USDA_API_KEY,"Accept": "application/json"}

    #def read(self, url: str, output_name: str, output_path: Path=None) -> json:
    def read(self, url: str) -> json:

        #if output_path is None:
        #    output_string = f"{output_name}.json"
        #else:
        #    output_string = output_path / f"{output_name}.json"

        response = requests.get(url, headers=self.headers)
        if not response.ok:
            print(f"Bad response for {url}")
            raise ValueError
        
        #response_txt = json.loads(response.text)
        #with open(output_string, "w") as w:
        #    w.write(json.dumps(response_txt, indent=4)) 
        response_json = json.loads(response.content)
        return response_json

    
    

In [None]:
pd.json_normalize(json_response)

In [8]:

import io


def commodity_data_get(commodity_years: pd.DataFrame) -> None:
    #DATA_PATH = Path(__file__).parent.resolve() / "data"
    DATA_PATH = Path("C:/Users/daniel/Documents/ag_export_project/de_ag_export/data/")
    usda_reader = USDAReader()
    for index, row in commodity_years.iterrows():
        cc = row["commodityCode"]
        year = row["marketYear"]
        url = f"https://apps.fas.usda.gov/OpenData/api/esr/exports/commodityCode/{cc}/allCountries/marketYear/{year}"
        json_response = usda_reader.read(url=url)
        #commodity = schema_to_pd_dtype.template_df_set(schema="commodity_data", file="source_schemas.json")
        #commodity = pd.concat([commodity, pd.DataFrame.from_records(json_response)])
        commodity = pd.DataFrame.from_records(json_response)
        schema_to_pd_dtype.df_dtype_set(df=commodity, schema="commodity_data", file="source_schemas.json")
        commodity.to_parquet(DATA_PATH / f"commodity_{cc}_{year}.parquet")
    return

sample_commodity_years = pd.DataFrame([[101, 2022], [102,2022], [1608, 1998]], columns=["commodityCode", "marketYear"])

commodity_data_get(sample_commodity_years)

In [9]:
DATA_PATH = Path("C:/Users/daniel/Documents/ag_export_project/de_ag_export/data/")

df = pd.read_parquet(DATA_PATH / "commodity_1608_1998.parquet")

In [10]:
df.dtypes

commodityCode                        Int64
countryCode                          Int64
weeklyExports                      float64
accumulatedExports                 float64
outstandingSales                   float64
grossNewSales                      float64
currentMYNetSales                  float64
currentMYTotalCommitment           float64
nextMYOutstandingSales             float64
nextMYNetSales                     float64
unitId                               Int64
weekEndingDate              datetime64[ns]
dtype: object

In [5]:
df.dtypes

commodityCode                        Int64
countryCode                          Int64
weeklyExports                      float64
accumulatedExports                 float64
outstandingSales                   float64
grossNewSales                      float64
currentMYNetSales                  float64
currentMYTotalCommitment           float64
nextMYOutstandingSales             float64
nextMYNetSales                     float64
unitId                               Int64
weekEndingDate              datetime64[ns]
dtype: object

In [4]:
schema_to_pd_dtype.df_dtype_set(df=df, schema="commodity_data", file="source_schemas.json")

In [3]:
def data_release_date() -> pd.DataFrame:
    drd = pd.read_json("data_release_dates.json")
    used_cols=["commodityCode", "marketYear", "releaseTimeStamp"]
    drd["releaseTimeStamp"] = pd.to_datetime(drd["releaseTimeStamp"], format="%Y-%m-%dT%H:%M:%S")
    
    # Pull last release date for commodity and year from db for comparison.
    try:
        p_drd = pd.read_csv("previous_data_release_dates.csv",
           usecols=used_cols,
           parse_dates=["releaseTimeStamp"])
    except FileNotFoundError:
        return drd[used_cols]
    
    p_drd.rename(columns={"releaseTimeStamp": "previousReleaseTimeStamp"}, inplace=True)
    drd = drd.merge(right=p_drd, on=["commodityCode", "marketYear"], how="outer")
    return drd

commodity_years = data_release_date()

In [7]:
commodity_years.loc[
        (~commodity_years["releaseTimeStamp"].isna()) &
        (commodity_years["releaseTimeStamp"] != commodity_years["previousReleaseTimeStamp"]),
        ["commodityCode", "marketYear"]]

Unnamed: 0,commodityCode,marketYear


In [9]:
DATA_PATH = Path("C:/Users/daniel/Documents/ag_export_project/de_ag_export/data/")
files = Path(DATA_PATH).glob("commodity_*.parquet")
for file in files:
    os.remove(file)

In [2]:
REF_PATH = Path("C:/Users/daniel/Documents/ag_export_project/de_ag_export/ref/")
commodities = pd.read_parquet(Path(REF_PATH) / "commodities.parquet")
commodities

Unnamed: 0,commodityCode,commodityName,unitId
0,101,Wheat - HRW,1
1,102,Wheat - SRW,1
2,103,Wheat - HRS,1
3,104,Wheat - White,1
4,105,Wheat - Durum,1
5,106,Wheat - Mixed,1
6,107,All Wheat,1
7,201,Wheat Products,1
8,301,Barley,1
9,401,Corn,1


In [4]:
co_ref = pd.read_json(Path(REF_PATH) / "commodity_sum.json")

In [10]:
co_cat = pd.read_excel(Path(os.sep.join([os.getcwd(),"de_ag_dbt","seeds"])) / "commodity_categories.xlsx")

In [13]:
co_ref.head()

Unnamed: 0,commodityCode,commodityName,f0_
0,1301,Cotton- Am Pima,12984813
1,801,Soybeans,855986509
2,301,Barley,7812551
3,1601,Cattle Hides - Whole - Excluding Wet Blues,638489292
4,1602,Calf Skins - Whole - Excluding Wet Blues,8898522


In [14]:
co_cat.head()

Unnamed: 0,Name,Total,Category,Category Total Marker
0,Cotton- Am Pima,12984813,Cotton,
1,Soybeans,855986509,Soybeans,
2,Barley,7812551,Barley,
3,Cattle Hides - Whole - Excluding Wet Blues,638489292,Hides or Skins,
4,Calf Skins - Whole - Excluding Wet Blues,8898522,Hides or Skins,


In [16]:
co = co_ref[["commodityCode", "commodityName"]].merge(
    right=co_cat,
    left_on="commodityName",
    right_on="Name",
    how="outer")

In [22]:
co.drop(columns=["Name", "Total"], inplace=True)

In [24]:
co["Category"] = co["Category"].replace({"Oil": "Grain/Seed Oil"})

In [26]:
co.to_csv(Path(os.sep.join([os.getcwd(),"de_ag_dbt","seeds"])) / "commodity_categories.csv", index=False)