# Data Download using Azure FarmBeats
Download of required satellite and weather data using Azure FarmBeats PaaS.

In [None]:
import sys
print(sys.executable)
print (sys.version)

In [None]:
# Check packages installed or not!
import geopandas
import rasterio
from azure.farmbeats.models import Farmer
import xarray

### Import 3rd party libraies

In [None]:
# Disable unnecessary logs 
import sys
import logging
logging.disable(sys.maxsize)

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import json
import numpy as np
import os
import pandas as pd
import rasterio

### Import Farmbeats and Utilities

In [None]:
from azure.farmbeats.models import Farmer
from utils.config import farmbeats_config

from utils.weather_util import fetch_weather, get_weather
from utils.satellite_util import (
    satellite_job_request,
    queue_sat_data,
    get_sat_file_paths
)

from c_funcs.constants import CONSTANTS

### Farmbeats Configuration

In [None]:
# FarmBeats Client definition
FB_Client = FarmbeatsClient(
        instance_url=farmbeats_config["instance_url"],
        tenant_id=farmbeats_config["tenant_id"],
        client_id=farmbeats_config["client_id"],
        client_secret=farmbeats_config["client_secret"],
        authority=farmbeats_config["authority"],
        scope=farmbeats_config["default_scope"],
    )

# Start and End data for Satellite and Weather data to be pulled
start_dt = datetime.strptime(CONSTANTS["interp_date_start"], "%d-%m-%Y")
end_dt = datetime.strptime(CONSTANTS["interp_date_end"], "%d-%m-%Y")

### Read Farm Boundaries

In [None]:
# Read 1000 farm geojsons from farms_1kmx1km.csv
locations_df = pd.read_csv("farms_sample_1kmx1km,csv")
locations_df["farms1"] = locations_df.farms.apply(
    json.loads
)  # farm geojsons converted from string to list with numeric elements

TOKEN = 5034
NO_FARMS = 1

In [None]:
farmzip = zip(locations_df.farms1.values[:NO_FARMS], [str(TOKEN)])

In [None]:
farmer_details = {
    "farmer_name": "dimattap",
    "TOKEN": 1986,
    "farmer_id": "dimattap_sdk_farmer_1986",
}


# Farmer object creation if not yet created
farmer = FB_Client.farmers.create(
    farmer_id=farmer_details["farmer_id"],
    farmer=Farmer(
        name=farmer_details["farmer_name"]
        + "'s SDK Farmer "
        + str(farmer_details["TOKEN"])
    ),
)

In [None]:
def satellite_job_request_farmcode(farm_code, farm_geojson):
    # function for satellite job request just using farm_code and geojson
    # other information will be derived locally
    try:
        sjr = satellite_job_request(
            FB_Client=FB_Client,
            farm_geojson=np.array(farm_geojson),
            farm_code=farm_code,
            farm_id=farmer_details["farmer_name"] + "_sdk_farm_" + farm_code,
            field_id=farmer_details["farmer_name"] + "_sdk_field_" + farm_code,
            boundary_id=farmer_details["farmer_name"] + "_sdk_boundary_" + farm_code,
            farmer_id=farmer_details["farmer_id"],
            out_file=CONSTANTS["sat_job_ids"],
            farmer_name=farmer_details["farmer_name"],
            start_dt=start_dt,
            end_dt=end_dt,
        )
    except Exception as e:
        print(e.message)
        return e
    return sjr

def get_weather_farmcode(farm_code):
    # function for queueing weather job in FarmBeats using only farm_code
    # other reuired information will be derived locally
    gwf = get_weather(
        FB_Client=FB_Client,
        farmer_id=farmer_details["farmer_id"],
        field_id=farmer_details["farmer_name"] + "_sdk_field_" + farm_code,
        start_dt=start_dt,
        end_dt=end_dt,
        api_name="dailyhistorical",
        name="Downloading 1000 farms",
        config=config,
    )
    return gwf

def fetch_weather_farmcode(farm_code):
    fwf = fetch_weather(
        FB_Client=FB_Client,
        farmer_id=farmer_details["farmer_id"],
        field_id=farmer_details["farmer_name"] + "_sdk_field_" + farm_code,
        start_dt=start_dt,
        end_dt=end_dt,
        weather_data_type="historical",
        out_folder=CONSTANTS["weather_data_fldr"],
    )
    return fwf

### Submit Satellite Jobs

In [None]:
# Create Satellite job for 1000 farms, the job ID will be saved in o_file
with ThreadPoolExecutor(max_workers=100) as executor:
    futures = [
        executor.submit(satellite_job_request_farmcode, farm_code, farm_geojson)
        for farm_geojson, farm_code in farmzip
    ]

In [None]:
# Get satellite job id and status
futures[0].result().serialize()['id']

### Save satellite job ids

In [None]:
df_sat_jobid = pd.read_csv(
    CONSTANTS["sat_job_ids"],
    names=[
        "farm_code",
        "polygon",
        "id",
        "farm_id",
        "field_id",
        "boundary_id",
        "farmer_id",
    ],
)

In [None]:
df_sat_jobid.head(10)

### Check Status of Satellite Jobs

In [None]:
# Wait for Satellite job and weather job untill get succeeded
with ThreadPoolExecutor() as executor:
    wait_funcs = [
        lambda: FB_Client.jobs.wait_for_job(
            job_id=futures[0].result().serialize()['id']
        )
    ]
    futures = [executor.submit(func) for func in wait_funcs]

# Check for Job status
satellite_job_terminal_state = [f.result() for f in futures]

### Submit Weather Jobs

In [None]:
# Start weather job for all Farms with Satellite data available
with ThreadPoolExecutor(max_workers=10) as executor:
    weathers = [
        executor.submit(get_weather_farmcode, str(farm_code))
        for farm_code in df_sat_jobid.farm_code.values
    ]

### Check status of Weather Jobs

In [None]:
# Wait for Satellite job and weather job untill get succeeded
with ThreadPoolExecutor() as executor:
    wait_funcs = [
        lambda: FB_Client.jobs.wait_for_job(
            job_id=weathers[-1].result().serialize()['id']
        )
    ]
    futures = [executor.submit(func) for func in wait_funcs]

# Check for Job status
satellite_job_terminal_state = [f.result() for f in futures]

### Download Satellite Data to Compute

In [None]:
"""
Download of Satellite Data and weather data from FarmBeats
"""
# make sure that download job has been completed

try:
    df_ndvi = get_sat_file_paths(
        FB_Client=FB_Client,
        boundary_ids=[df_sat_jobid.boundary_id.values[-1]],
        farmer_id=farmer_details["farmer_id"],
        start_dt=start_dt,
        end_dt=end_dt,
        band_name=CONSTANTS["var_name"].upper()
    )
except Exception as e:
    print(e.response.body())

df_ndvi.to_csv(CONSTANTS["sat_file_paths"])  # save NDVI file locations to csv

### Download Weather Data to Compute

In [None]:
# Fetch weather data for Farms which has complete job from get_weather. The data will be saved to CSV file in weather_fldr with name fieldid.csv
with ThreadPoolExecutor(max_workers=100) as executor:
    weathers_fetch = [
        executor.submit(fetch_weather_farmcode, str(frmcd))
        for frmcd in df_sat_jobid.farm_code.values
    ]

In [None]:
try:  
    scenes = FB_Client.scenes.get_all(
        farmer_id=farmer_details["farmer_id"],
        boundary_id="dimattap_sdk_boundary_"+str(TOKEN),
        start_date=start_dt,
        end_date=end_dt,
    )
except Exception as e:
    print(e.response.body())
    
print("Total Scenes Downloaded:", len(scenes))

weather_data = FB_Client.weatherdata.get_weather_data(
    farmer_id=farmer_details["farmer_id"],
    field_id="dimattap_sdk_field_" + str(TOKEN),
    start_date_time=start_dt,
    end_date_time=end_dt,
    extension_id="DTN.ClearAg",
    weather_data_type="historical",
    granularity="daily",
)

print("Total Weather points Downloaded:", len(weather_data))

### Visualize Band Files

In [None]:
filepaths= df_ndvi.filePath.values[:2]
print(filepaths)

In [None]:
from rasterio.plot import show
from rasterio.plot import show_hist
import matplotlib.pyplot as plt

with rasterio.open(filepaths[0]) as src:
    nir = src.read(1)

plt.imshow(nir)
plt.colorbar()
plt.title('NDVI')
plt.xlabel('Column #')
plt.ylabel('Row #')

In [None]:
nir.shape