### Soybean Futures Contracts Pricing Model
#### Data Preprocessing

#### Libraries and Imports

In [10]:
import os
import json

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from datetime import datetime, timedelta
from pathlib import Path


#### WASDE Data- Functions

In [11]:
def filter_wasde_by_commodity(data):
    """
    Filters data by commodity and region according to predefined criteria.
    Returns a list of filtered DataFrames.
    """
    filter_conditions = [
        (
            "World Corn Supply and Use",
            ["World", "Major exporters", "United States", "Major Exporters"],
        ),
        (
            "World Wheat Supply and Use",
            ["World", "Major exporters", "United States", "Major Exporters"],
        ),
        (
            "World Soybean Supply and Use",
            ["World", "Argentina", "Brazil", "United States", "Major Exporters"],
        ),
        (
            "World Soybean Meal Supply and Use",
            ["World", "Major exporters", "United States", "Major Exporters"],
        ),
        (
            "World Soybean Oil Supply and Use",
            ["World", "Major exporters", "United States", "Major Exporters"],
        ),
    ]

    filtered_data = []
    for title, regions in filter_conditions:
        filtered = data[(data["ReportTitle"] == title) & (data["Region"].isin(regions))]
        filtered_data.append(filtered)
    return filtered_data

def process_wasde_by_path(path, wasde_data):
    """
    Processes and aggregate historical WASDE data by path.
    """
    data = pd.read_csv(path, low_memory=False)
    data_collection = filter_wasde_by_commodity(data)

    for data_set in data_collection:
        data_set = data_set[data_set["ProjEstFlag"] == "Proj."]
        grouped_data = data_set.groupby("ReleaseDate")

        for _, group in grouped_data:
            filtered_data = group[
                ["ReleaseDate", "Commodity", "Region", "Attribute", "Value", "Unit"]
            ]
            filtered_data = filtered_data.rename(columns={"ReleaseDate": "Report Date"})
            wasde_data = pd.concat([wasde_data, filtered_data], ignore_index=True)

    return wasde_data

def clean_wasde_data(data):
    """
    Cleans the data by coercing types and applying a mapping to the 'Attribute' column.
    """
    conversion_dict = {
        "Domestic Feed": "Feed",
        "Domestic Crush": "Crush",
        "Domestic Total": "Total Use",
    }

    data["Report Date"] = pd.to_datetime(data["Report Date"]).dt.date
    data["Attribute"] = data["Attribute"].replace(conversion_dict)

    data_collection = data.groupby(["Report Date", "Commodity"])
    sorted_data = pd.concat([group for _, group in data_collection]).sort_values(
        by=["Report Date", "Commodity"]
    )
    
    dtype_wasde_data = {
        "Report Date": "datetime64[ns]",
        "Commodity": "category",
        "Region": "category",
        "Attribute": "category",
        "Value": "float64"
    }
    sorted_data = sorted_data.astype(dtype_wasde_data)

    return sorted_data.reset_index(drop=True)


def process_wasde_data(excel_2000_2010, csv_2010_2020, csv_2021_2024, output_path):
    """
    Aggregates WASDE data from multiple sources and saves it to a Parquet file.
    """
    # Load and concatenate Excel data, 2000 - 2010
    wasde_raw = pd.read_excel(excel_2000_2010, sheet_name=None)
    wasde_data = pd.concat(wasde_raw.values(), ignore_index=True)
    wasde_data.rename(columns={"Country": "Region"}, inplace=True)

    # Process CSV paths, 2010 - 2020
    # processed_data = pd.DataFrame()
    for path in csv_2010_2020:
        wasde_data = process_wasde_by_path(path, wasde_data)

    # Process files in the directory, 2021 - 2024
    csv_dir_path = Path(csv_2021_2024)
    for file_path in csv_dir_path.glob("*.csv"):
        wasde_data = process_wasde_by_path(file_path, wasde_data)

    # wasde_data = pd.concat([wasde_data, processed_data], ignore_index=True)
    wasde_data = clean_wasde_data(wasde_data)
    wasde_data.to_parquet(output_path, index=False)

In [3]:
def aggregate_wasde_data(input_path, output_path):
    processed_data = pd.read_parquet(input_path)
    processed_data = processed_data.drop(columns=["Unit"])
    processed_rows = []
    replacements = {
        "OilSeed, Soybeans": "Soybeans",
        "Oilseed, Soybean": "Soybeans",
        "Meal, Soybeans": "Soybean Meal",
        "Oil, Soybeans": "Soybean Oil",
    }

    # Group by 'Report Date', 'Commodity', and 'Region'
    grouped = processed_data.groupby(["Report Date", "Commodity", "Region"])

    for (report_date, commodity, region), group in grouped:
        # Initialize a dictionary for the row
        aggregate_row = {
            "Report Date": report_date,
            "Commodity": commodity,
            "Region": region,
            "Beginning Stocks": None,
            "Production": None,
            "Imports": None,
            "Exports": None,
            "Feed/Crush": None,
            "Total Use": None,
            "Ending Stocks": None,
            "STU": None,
        }

        # Fill the row dictionary based on the 'Attribute' column
        ending_stocks = None
        total_use = None
        stu = None
        for _, record in group.iterrows():
            attribute = record["Attribute"]
            value = record["Value"]

            if attribute == "Beginning Stocks":
                aggregate_row["Beginning Stocks"] = value
            elif attribute == "Production":
                aggregate_row["Production"] = value
            elif attribute == "Imports":
                aggregate_row["Imports"] = value
            elif attribute == "Exports":
                aggregate_row["Exports"] = value
            elif attribute in ["Feed", "Crush"]:
                aggregate_row["Feed/Crush"] = value
            elif attribute in ["Total Use", "Use, Total"]:
                aggregate_row["Total Use"] = value
                total_use = value
            elif attribute == "Ending Stocks":
                aggregate_row["Ending Stocks"] = value
                ending_stocks = value

        if ending_stocks and total_use is not None:
            stu = round(ending_stocks / total_use, 4)
            aggregate_row["STU"] = stu

        # Append the row to the list of processed rows
        processed_rows.append(aggregate_row)

    processed_data = pd.DataFrame(processed_rows)
    processed_data["Commodity"] = processed_data["Commodity"].replace(replacements)

    dtype_aggregate_data = {
        "Report Date": "datetime64[ns]",
        "Commodity": "category",
        "Region": "category",
        "Beginning Stocks": "float64",
        "Production": "float64",
        "Imports": "float64",
        "Exports": "float64",
        "Feed/Crush": "float64",
        "Total Use": "float64",
        "Ending Stocks": "float64",
        "STU": "float64"
    }

    processed_data = processed_data.astype(dtype_aggregate_data)
    processed_data.to_parquet(output_path, index=False)

In [4]:
def filter_wasde_soybeans(data_path, output_path):
    data = pd.read_parquet(data_path)
    data["Report Date"] = pd.to_datetime(data["Report Date"])
    data["Report Month"] = data["Report Date"].dt.to_period("M")
    filtered_rows = []

    grouped = data.groupby("Report Date")

    for report_date, group in grouped:
        soybean_row = {
            "Report Date": report_date,
            "Report Month": report_date.to_period("M"),
            "STU, US": None,
            "STU, AR": None,
            "STU, BR": None,
            "STU, Corn": None,
            "Production, US": None,
            "Production, AR": None,
            "Production, BR": None,
        }

        for _, row in group.iterrows():
            if row["Commodity"] == "Soybeans":
                if row["Region"] == "United States":
                    soybean_row["STU, US"] = row["STU"]
                    soybean_row["Production, US"] = row["Production"]
                elif row["Region"] == "Argentina":
                    soybean_row["STU, AR"] = row["STU"]
                    soybean_row["Production, AR"] = row["Production"]
                elif row["Region"] == "Brazil":
                    soybean_row["STU, BR"] = row["STU"]
                    soybean_row["Production, BR"] = row["Production"]

            if row["Commodity"] == "Corn" and row["Region"] == "United States":
                soybean_row["STU, Corn"] = row["STU"]

        # Append the new row to the list
        filtered_rows.append(soybean_row)

    dtype_soybean_row = {
        "Report Date": "datetime64[ns]",
        "Report Month": "datetime64[ns]",
        "STU, US": "float64",
        "STU, AR": "float64",
        "STU, BR": "float64",
        "STU, Corn": "float64",
        "Production, US": "float64",
        "Production, AR": "float64",
        "Production, BR": "float64",
    }

    processed_data = pd.DataFrame(filtered_rows)
    processed_data = processed_data.astype(dtype_soybean_row)
    processed_data.to_parquet(output_path, index=False)

def append_indicators(data_path, indicator_path, output_path):
    # Read the main data and indicators data
    data = pd.read_parquet(data_path)
    indicators = pd.read_csv(indicator_path,low_memory=False)

    # Convert 'Report Month' in df_data and 'Date' in df_indicator to period format (YYYY-MM)
    data["Report Month"] = pd.to_datetime(data["Report Date"]).dt.to_period("M")
    indicators["Date"] = pd.to_datetime(indicators["Date"]).dt.to_period("M")

    # Merge the main data with the indicators data on 'Report Month' and 'Date'
    merged_data = pd.merge(
        data, indicators, how="left", left_on="Report Month", right_on="Date"
    )
    
    dtype_merged_data = {
        "Report Date": "datetime64[D]",
        "STU, US": "float64",
        "STU, AR": "float64",
        "STU, BR": "float64",
        "STU, Corn": "float64",
        "Production, US": "float64",
        "Production, AR": "float64",
        "Production, BR": "float64",
        "GDP (Bn USD)": "float64",
        "Gold": "float64",
        "DX": "float64",
        "Crude": "float64",
        "USD?BRL": "float64"
    }

    merged_data.drop(columns=["Report Month", "Date"], inplace=True)
    merged_data = merged_data.astype(dtype_merged_data)
    merged_data.to_parquet(output_path, index=False)


#### WASDE- Excecution

In [12]:
# Define paths
wasde_2000_2010 = "../data/raw/wasde/2000-2010/World_WASDE_2000-2010.xlsx"
wasde_2010_2020 = [
    "../data/raw/wasde/2010-2020/WASDE_2010-2015.csv",
    "../data/raw/wasde/2010-2020/WASDE_2016-2020.csv",
]
wasde_2021_2024 = "../data/raw/wasde/2021-2024"
processed_data_path = "../data/interim/wasde.parquet"
aggregated_data_path = "../data/interim/wasde_aggregate.parquet"
filtered_soybean_data_path = "../data/interim/wasde_soybeans.parquet"
indicators_path = "../data/interim/macroeconomic_indicators.csv"
output_path = "../data/processed/soybeans_model_training_data.parquet"

# Run the aggregation process
process_wasde_data(
    wasde_2000_2010, wasde_2010_2020, wasde_2021_2024, processed_data_path
)
aggregate_wasde_data(processed_data_path, aggregated_data_path)
filter_wasde_soybeans(aggregated_data_path, filtered_soybean_data_path)
append_indicators(filtered_soybean_data_path, indicators_path, output_path)

#### Historical Pricing Data- Functions

In [6]:
def get_sorted_contract_names(csv_files, symbols, years):
    """Sorts the CSV files by expiration based on symbols and years."""
    contracts_sorted_by_expiration = []
    for year in years:
        for symbol in symbols:
            contract_name = f"{symbol}{str(year)[-2:]}.csv"
            if contract_name in csv_files:
                contracts_sorted_by_expiration.append(contract_name)
    return contracts_sorted_by_expiration


def generate_price_data(row):
    price_high = row["High"]
    price_low = row["Low"]
    
    if pd.isna(price_high) or pd.isna(price_low):
        return json.dumps({})
    else:
        return json.dumps({"high": float(price_high), "low": float(price_low)})


def process_raw_price_data(csv_dir, contracts_sorted_by_expiration, all_price_data):
    """Merges contract data into the main DataFrame."""
    all_price_data['Date'] = pd.to_datetime(all_price_data['Date'])
    for contract in contracts_sorted_by_expiration:
        contract_name = contract.replace(".csv", "")
        path = os.path.join(csv_dir, contract)

        # Read and process the contract CSV
        contract_price_data = pd.read_csv(path).iloc[:-1]
        contract_price_data = contract_price_data.rename(columns={"Time": "Date"})
        contract_price_data["Date"] = pd.to_datetime(contract_price_data["Date"])
        contract_price_data["Price"] = contract_price_data.apply(generate_price_data, axis=1)

        # Merge into the main DataFrame
        all_price_data = all_price_data.merge(
            contract_price_data[["Date", "Price"]],
            on="Date",
            how="left",
            suffixes=("", f"_{contract_name}"),
        )
        all_price_data.rename(columns={"Price": f"{contract_name}"}, inplace=True)

    return all_price_data.fillna("")


def aggregate_price_data(data_dir, output_dir):
    dates_path = os.path.join(output_dir, "trading_dates.parquet")
    aggregate_price_path = os.path.join(output_dir, "prices_soybeans_aggregate.parquet")

    trading_dates = pd.read_parquet(dates_path)

    data = pd.DataFrame(trading_dates, columns=["Date"])
    symbols = ["SF", "SH", "SK", "SN", "SQ", "SU", "SX"]
    years = list(range(2000, pd.Timestamp.today().year + 2))

    contract_csv_collection = [f for f in os.listdir(data_dir) if f.endswith(".csv")]
    contracts_sorted = get_sorted_contract_names(contract_csv_collection, symbols, years)

    data = process_raw_price_data(data_dir, contracts_sorted, data)
    data.to_parquet(aggregate_price_path, index=False)
    

In [7]:
def unpack_prices(price_json):
    if not price_json or price_json == '{}':
        return None, None
    
    try:
        price_dict = json.loads(price_json)
        high = price_dict.get("high", None)
        low = price_dict.get("low", None)
        return float(high), float(low) if high is not None and low is not None else (None, None)
    except (json.JSONDecodeError, ValueError, TypeError):
        return None, None


def determine_contract(month, year, date, wasde_date):
    """Determines the correct contract based on the month, year, and WASDE date."""
    yr = str(year)[-2:]  # Get the last two digits of the year
    contracts = {
        1: (f"SF{yr}", f"SH{yr}"),
        2: (f"SH{yr}", f"SH{yr}"),
        3: (f"SH{yr}", f"SK{yr}"),
        4: (f"SK{yr}", f"SK{yr}"),
        5: (f"SK{yr}", f"SN{yr}"),
        6: (f"SN{yr}", f"SN{yr}"),
        7: (f"SN{yr}", f"SQ{yr}"),
        8: (f"SQ{yr}", f"SU{yr}"),
        9: (f"SU{yr}", f"SX{yr}"),
        10: (f"SX{yr}", f"SX{yr}"),
        11: (f"SX{yr}", f"SF{str(int(year) + 1)[-2:]}"),
        12: (f"SF{str(int(year) + 1)[-2:]}", f"SF{str(int(year) + 1)[-2:]}"),
    }
    
    current_contract, next_contract = contracts.get(month, ("", ""))
    
    # Decide whether to stay on the current contract or move to the next
    if date >= wasde_date:
        return next_contract
    else:
        return current_contract

def process_continuous_data(trading_dates, wasde_dates, price_data):
    """Processes continuous futures data by selecting the appropriate contracts."""
    continuous_data = []
    wasde_iter = iter(wasde_dates["Report Date"])
    current_wasde_date = next(wasde_iter, None)
    contract_name = None

    for date in trading_dates["Date"]:
        if current_wasde_date is not None and date >= current_wasde_date:
            # Determine contract before rolling over to the next WASDE date
            month, year = date.month, date.year
            contract_name = determine_contract(month, year, date, current_wasde_date)
            
            # Roll over to the next WASDE date
            try:
                current_wasde_date = next(wasde_iter)
            except StopIteration:
                current_wasde_date = wasde_dates["Report Date"].iloc[-1]
        
        # Use the determined contract name until the date surpasses the current_wasde_date
        if (
            contract_name
            and not price_data.loc[
                price_data["Date"] == date, contract_name
            ].empty
        ):
            daily_price_json = price_data.loc[
                price_data["Date"] == date, contract_name
            ].values[0]
            high, low = unpack_prices(daily_price_json)

            if high is not None and low is not None:
                average = (high + low) / 2
                continuous_data.append([date, contract_name, high, low, average])

    return pd.DataFrame(
        continuous_data, columns=["Date", "Contract", "High", "Low", "Average"]
    )    

def get_next_contracts(base_contract):
    """Gets the base contract and the next 6 contracts in the sequence following the base contract."""
    contract_sequence = ["SF", "SH", "SK", "SN", "SQ", "SU", "SX"]

    # Ensure year suffix is two digits
    year_suffix = base_contract[-2:]
    base_contract_name = base_contract[:-2]
    base_index = contract_sequence.index(base_contract_name)

    # Start with contracts from the current year
    contracts = [
        f"{contract}{year_suffix}" for contract in contract_sequence[base_index:]
    ]

    # If less than 7 contracts, include contracts from the next year
    if len(contracts) < 7:
        next_year_suffix = f"{(int(year_suffix) + 1) % 100:02}"  # Ensure two-digit format
        contracts.extend(
            [
                f"{contract}{next_year_suffix}"
                for contract in contract_sequence[: 7 - len(contracts)]
            ]
        )
    return contracts[:7]  # Ensure the list is exactly 7 contracts long

def process_year_ahead_pricing_data(trading_dates, wasde_dates, price_data):
    """Processes continuous futures data by selecting the appropriate contracts."""
    continuous_data_ext = []
    wasde_iter = iter(wasde_dates["Report Date"])
    current_wasde_date = next(wasde_iter, None)
    contract_name = None
    contract_collection = []

    for date in trading_dates["Date"]:
        if current_wasde_date is not None and date >= current_wasde_date:
            month, year = date.month, date.year
            contract_name = determine_contract(month, year, date, current_wasde_date)
            contract_collection = get_next_contracts(contract_name)

            try:
                current_wasde_date = next(wasde_iter)
            except StopIteration:
                current_wasde_date = wasde_dates["Report Date"].iloc[-1]

        for contract in contract_collection:
            if (
            contract
            and not price_data.loc[
                price_data["Date"] == date, contract
            ].empty
            ):
                daily_price_json = price_data.loc[
                    price_data["Date"] == date, contract
                ].values[0]
                high, low = unpack_prices(daily_price_json)

                if high is not None and low is not None:
                    average = (high + low) / 2
                    continuous_data_ext.append([date, contract, high, low, average])
    return pd.DataFrame(
        continuous_data_ext, columns=["Date", "Contract", "High", "Low", "Average"]
    )

def generate_continuous_price_data(data_path, output_dir):
    dates_path = os.path.join(output_dir, "trading_dates.parquet")
    aggregate_price_path = os.path.join(output_dir, "prices_soybeans_aggregate.parquet")
    continuous_price_path = os.path.join(output_dir, "prices_soybeans_continuous.parquet")
    continuous_price_ext_path = os.path.join(output_dir, "prices_soybeans_continuous_ext.parquet")

    price_data = pd.read_parquet(aggregate_price_path)
    trading_dates = pd.read_parquet(dates_path)
    wasde_dates = pd.read_parquet(data_path)

    trading_dates["Date"] = pd.to_datetime(trading_dates["Date"])
    wasde_dates["Report Date"] = pd.to_datetime(wasde_dates["Report Date"])

    price_continuous = process_continuous_data(trading_dates, wasde_dates, price_data)
    price_continuous.to_parquet(continuous_price_path, index=False)
    price_ext = process_year_ahead_pricing_data(trading_dates, wasde_dates, price_data)
    price_ext.to_parquet(continuous_price_ext_path, index=False)

#### Pricing Data- Execution 


In [8]:
data_dir = "../data/raw/historical_prices/soybeans"
output_dir = "../data/interim/"
wasde_path = "../data/interim/wasde_soybeans.parquet"

aggregate_price_data(data_dir, output_dir)
generate_continuous_price_data(wasde_path, output_dir)

### Aggregate All Model Input Data


In [9]:
price_path = "../data/interim/prices_soybeans_continuous.parquet"
data_path = "../data/processed/soybeans_model_training_data.parquet"
output_csv_path = "../data/processed/soybeans_model_training_data.csv"

processed_data = pd.read_parquet(data_path)
daily_price_data = pd.read_parquet(price_path)

# Standardize date columns to datetime.date
processed_data.rename(columns={"Report Date": "Date"}, inplace=True)
processed_data["Date"] = pd.to_datetime(processed_data["Date"]).dt.date
daily_price_data["Date"] = pd.to_datetime(daily_price_data["Date"]).dt.date

# Initialize lists to store results
price_high_list = []
price_low_list = []
price_average_list = []
price_collections = []

# Create an iterator over the report dates
report_date_iter = iter(processed_data["Date"])

# Get the first report date
start_date = next(report_date_iter, None)

# Iterate through report dates
for end_date in report_date_iter:
    # Filter the daily_price_data for the date range
    group = daily_price_data[(daily_price_data["Date"] >= start_date) & (daily_price_data["Date"] < end_date)]

    if not group.empty:
        group = group.head(15)  # Limit to the first 15 rows
        price_array = group["Average"].tolist()

        price_high = group["High"].max()
        price_low = group["Low"].min()
        price_average = group["Average"].mean()
    else:
        # If the group is empty, assign NaN or an appropriate default value
        price_array = []
        price_high = float("nan")
        price_low = float("nan")
        price_average = float("nan")

    # Append the results to the respective lists
    price_high_list.append(float(price_high))
    price_low_list.append(float(price_low))
    price_average_list.append(float(price_average))
    price_collections.append(price_array)

    # Update the start date to the current end date for the next iteration
    start_date = end_date

# Handle the last report date (no next end date)
group = daily_price_data[daily_price_data["Date"] >= start_date]

if not group.empty:
    group = group.head(15)
    price_array = group["Average"].tolist()

    price_high = group["High"].max()
    price_low = group["Low"].min()
    price_average = group["Average"].mean()
else:
    price_array = []
    price_high = float("nan")
    price_low = float("nan")
    price_average = float("nan")

# Append the results to the lists for the last report date
price_high_list.append(float(price_high))
price_low_list.append(float(price_low))
price_average_list.append(float(price_average))
price_collections.append(price_array)

# Add the new columns to the DataFrame
processed_data["Price_High"] = price_high_list
processed_data["Price_Low"] = price_low_list
processed_data["Price_Average"] = price_average_list
processed_data["Average_Price_Collection"] = price_collections

# Save the DataFrame to CSV
processed_data.to_parquet(data_path)
processed_data.to_csv(output_csv_path, index=False)