<a href="https://colab.research.google.com/github/bhandrigan/ag_trends/blob/main/ag_trends_us_states.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pytrends google-ads scrapingbee  shapely>=2.0.0 -q

In [None]:
# @title Load libraries and functions
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import numpy as np
import requests
import gzip
from io import BytesIO
from pytrends.request import TrendReq
from google.ads.googleads.client import GoogleAdsClient
from google.api_core.exceptions import ResourceExhausted
from requests.exceptions import RequestException
from requests.exceptions import ReadTimeout
import time
import os
import zipfile
from urllib.parse import urlparse
import tempfile
import json
from google.colab import userdata
import zipfile
import re
import random
from scrapingbee import ScrapingBeeClient
import shapely
from shapely.geometry import shape
import multiprocessing as mp
import unicodedata




scrapingbee_api_key = userdata.get('SCRAPING_BEE')
gads_sa = json.loads(userdata.get('N90_GADS_SA_ACCOUNT_JSON'))
gads_api_key = userdata.get('N90_GADS_API_KEY')
sa_account = tempfile.NamedTemporaryFile(delete=False)
sa_account.write(json.dumps(gads_sa).encode())
sa_account.close()
sa_account_path = sa_account.name

gads_account = "8417741864"
use_proto_plus = True
impersonated_email = "api@n90.co"


# create google-ads.yaml file text
yaml_content = f"""
developer_token: {gads_api_key}
use_proto_plus: {use_proto_plus}
json_key_file_path: {sa_account_path}
impersonated_email: {impersonated_email}
login_customer_id: {gads_account}
"""

# Initialize the client with the dictionary configuration (hypothetical)
client = GoogleAdsClient.load_from_string(yaml_content)


def normalize_text(text):
    text = unicodedata.normalize('NFKD', text)
    return ''.join(c for c in text if not unicodedata.combining(c)).upper()

def fetch_unzip_load_shapefile_flexible(url, output_dir):
    """
    Fetches a zip file containing shapefiles from a URL,
    unzips it if needed, and loads the first .shp file found
    in the output directory into a GeoDataFrame.

    Removes query string variables from the local filename.

    Args:
        url (str): The URL of the zip file containing the shapefiles.
        output_dir (str): The directory where the zip file will be downloaded
                          and unzipped.

    Returns:
        gpd.GeoDataFrame: The loaded shapefile as a GeoDataFrame, or None if
                          download, unzipping, or loading fails, or if no
                          .shp files are found.
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Parse the URL to get the path component
    parsed_url = urlparse(url)
    # Get the base filename from the URL path and remove query string
    zip_filename = os.path.basename(parsed_url.path)
    zip_file_path = os.path.join(output_dir, zip_filename)

    # Check if the zip file already exists (indicates previous download/unzip)
    if os.path.exists(zip_file_path):
        print(f"Zip file '{zip_filename}' already exists in '{output_dir}'. Skipping download.")
    else:
        # If the zip file doesn't exist, attempt to download
        print(f"Zip file '{zip_filename}' not found. Attempting to download from {url}...")
        try:
            response = requests.get(url, stream=True, verify=False)
            response.raise_for_status()  # Raise an HTTPError for bad responses

            # Download the zip file
            with open(zip_file_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            print(f"Zip file downloaded to '{zip_file_path}'.")

        except requests.exceptions.RequestException as e:
            print(f"Error downloading file: {e}")
            return None

    # Check if .shp files already exist in the output directory
    shp_files = [f for f in os.listdir(output_dir) if f.endswith('.shp')]
    if shp_files:
        print(f"Shapefile(s) found in '{output_dir}'. Loading the first one...")
        shapefile_name_to_load = shp_files[0]  # Load the first .shp file found
        shape_file_path = os.path.join(output_dir, shapefile_name_to_load)
        try:
            gdf = gpd.read_file(shape_file_path)
            print(f"Shapefile '{shapefile_name_to_load}' loaded successfully.")
            return gdf
        except Exception as e:
            print(f"Error loading shapefile '{shapefile_name_to_load}': {e}")
            return None
    else:
        # If no .shp files are found, attempt to unzip
        print(f"No .shp files found in '{output_dir}'. Attempting to unzip '{zip_filename}'...")
        try:
            with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                zip_ref.extractall(output_dir)
            print(f"Zip file extracted to '{output_dir}'.")

            # After unzipping, look for .shp files again
            shp_files_after_unzip = [f for f in os.listdir(output_dir) if f.endswith('.shp')]
            if shp_files_after_unzip:
                print(f"Shapefile(s) found after unzipping. Loading the first one...")
                shapefile_name_to_load = shp_files_after_unzip[0]
                shape_file_path = os.path.join(output_dir, shapefile_name_to_load)
                try:
                    gdf = gpd.read_file(shape_file_path)
                    print(f"Shapefile '{shapefile_name_to_load}' loaded successfully.")
                    return gdf
                except Exception as e:
                    print(f"Error loading shapefile '{shapefile_name_to_load}': {e}")
                    return None
            else:
                print(f"No .shp files found in '{output_dir}' after unzipping.")
                return None

        except zipfile.BadZipFile as e:
            print(f"Error unzipping file (Bad zip file): {e}")
            return None
        except Exception as e:
            print(f"An unexpected error occurred during unzipping or loading: {e}")
            return None



# Build request to fetch keyword plan metrics by location
def get_keyword_estimates(client, keywords, geo_ids):
    keyword_plan_service = client.get_service("KeywordPlanIdeaService")

    results = []

    for state, geo_id in geo_ids.items():
        request = {
            "customer_id": client.login_customer_id,
            "language": "languageConstants/1000",  # English
            "geo_target_constants": [f"geoTargetConstants/{geo_id}"],
            "keyword_plan_network": client.enums.KeywordPlanNetworkEnum.GOOGLE_SEARCH_AND_PARTNERS,
            "keyword_seed": {"keywords": keywords},
        }

        try:
            response = keyword_plan_service.generate_keyword_ideas(request=request)

            for result in response:
                results.append({
                    'state': state,
                    'keyword': result.text,
                    'avg_monthly_searches': result.keyword_idea_metrics.avg_monthly_searches,
                    'competition': result.keyword_idea_metrics.competition.name,
                })
        except Exception as e:
            print(f"Error in state {state} with geo ID {geo_id}: {e}")

    return pd.DataFrame(results)




def get_repair_keyword_ideas(client, seed_keywords, geo_id='2840', language_id='1000', suggestions_per_keyword=10, retries=5):
    """
    Fetch keyword ideas per seed keyword, salvaging results if API limit reached.

    Args:
        client: GoogleAdsClient instance.
        seed_keywords (list): Initial seed keywords.
        geo_id (str): Geo target ID ('2840' for US).
        language_id (str): Language ID ('1000' for English).
        suggestions_per_keyword (int): Top suggestions per seed keyword.
        retries (int): Retries on API rate-limit errors.

    Returns:
        pd.DataFrame: Collected keyword suggestions and metrics.
    """
    keyword_plan_service = client.get_service("KeywordPlanIdeaService")
    results = []

    for seed_keyword in seed_keywords:
        request = {
            "customer_id": client.login_customer_id,
            "language": f"languageConstants/{language_id}",
            "geo_target_constants": [f"geoTargetConstants/{geo_id}"],
            "keyword_plan_network": client.enums.KeywordPlanNetworkEnum.GOOGLE_SEARCH_AND_PARTNERS,
            "keyword_seed": {"keywords": [seed_keyword]},
            "page_size": suggestions_per_keyword
        }

        attempt = 0
        wait_time = 5  # Initial backoff in seconds

        while attempt <= retries:
            try:
                response = keyword_plan_service.generate_keyword_ideas(request=request)

                for result in response:
                    results.append({
                        'seed_keyword': seed_keyword,
                        'suggested_keyword': result.text,
                        'avg_monthly_searches': result.keyword_idea_metrics.avg_monthly_searches,
                        'competition': result.keyword_idea_metrics.competition.name,
                    })
                # Successfully retrieved results; break retry loop
                break

            except ResourceExhausted:
                print(f"Rate limit hit fetching '{seed_keyword}'. Attempt {attempt + 1}/{retries}. Retrying in {wait_time}s...")
                time.sleep(wait_time)
                wait_time *= 2  # Exponential backoff
                attempt += 1

        if attempt > retries:
            print(f"Exceeded max retries for '{seed_keyword}'. Moving to next keyword.")

    # Return all successfully collected results even if some requests failed
    return pd.DataFrame(results)

def prep_paid_search_data(df, geo_col='state'):
    df = df.groupby(geo_col).sum('avg_monthly_searches')
    df = df.reset_index()
    df['state'] = df['state'].str.upper()
    total_searches = df['avg_monthly_searches'].sum()
    df['paid_search_composite_factor_100'] = (df['avg_monthly_searches'] / total_searches) * 100
    return df

def get_trends_via_scrapingbee(
        keyword_list, scrapingbee_api_key, language='en-US', tz=360, geo='US',
        resolution='REGION', timeframe='today 12-m', inc_low_vol=True,
        max_retries=4, initial_wait=10, timeout=(10, 30)):  # increased timeouts clearly

    proxy_url = f"http://{scrapingbee_api_key}:@proxy.scrapingbee.com:8886"
    proxies = [proxy_url]
    dfs = []

    for kw in keyword_list:
        retries = 0
        wait_time = initial_wait

        while retries <= max_retries:
            try:
                pytrends = TrendReq(
                    hl=language,
                    tz=tz,
                    proxies=proxies,
                    timeout=timeout,
                    requests_args={'verify': False}
                )

                pytrends.build_payload([kw], geo=geo, timeframe=timeframe)
                df_kw = pytrends.interest_by_region(resolution=resolution, inc_low_vol=inc_low_vol)
                df_kw.rename(columns={kw: kw.replace(' ', '_')}, inplace=True)
                dfs.append(df_kw)
                print(f"[Success] '{kw}' retrieved successfully.")
                break

            except (RequestException, ReadTimeout) as e:
                error_str = str(e).lower()
                if '429' in error_str or 'too many requests' in error_str or 'timeout' in error_str:
                    print(f"[Retryable Error] '{kw}' - Retrying in {wait_time}s (attempt {retries+1}/{max_retries}): {e}")
                    jitter = random.uniform(2, 5)
                    time.sleep(wait_time + jitter)
                    wait_time *= 2
                    retries += 1
                else:
                    print(f"[Critical Error] '{kw}' - Non-retryable error: {e}")
                    raise

        if retries > max_retries:
            print(f"[Skipped] Exceeded maximum retries for keyword '{kw}'.")

        sleep_between_keywords = random.uniform(5, 10)
        print(f"Waiting {sleep_between_keywords:.1f}s before next keyword...")
        time.sleep(sleep_between_keywords)

    if dfs:
        df_combo = pd.concat(dfs, axis=1)
        print("Data successfully retrieved for some/all keywords.")
    else:
        df_combo = pd.DataFrame()
        print("No data retrieved for keywords after retries.")

    return df_combo

def get_trends(keyword_list, language='en-US', tz=360, geo='US', resolution='REGION',
               timeframe='today 12-m', inc_low_vol=True, max_retries=5, initial_wait=10):

    dfs = []

    for kw in keyword_list:
        retries = 0
        wait_time = initial_wait

        while retries <= max_retries:
            try:
                # Create pytrends instance without internal retries
                pytrends = TrendReq(hl=language, tz=tz)

                pytrends.build_payload([kw], geo=geo, timeframe=timeframe)
                df_kw = pytrends.interest_by_region(resolution=resolution, inc_low_vol=inc_low_vol)
                df_kw.rename(columns={kw: kw.replace(' ', '_')}, inplace=True)
                dfs.append(df_kw)
                print(f"Successfully retrieved data for keyword '{kw}'")
                break  # Exit loop on success

            except RequestException as e:
                error_str = str(e).lower()
                if '429' in error_str or 'too many requests' in error_str:
                    print(f"429 error on keyword '{kw}' - retrying in {wait_time}s "
                          f"(attempt {retries+1}/{max_retries})...")
                    jitter = random.uniform(1, 5)
                    time.sleep(wait_time + jitter)
                    wait_time *= 2
                    retries += 1
                else:
                    print(f"Non-retryable error: {e}")
                    raise

        if retries > max_retries:
            print(f"Exceeded maximum retries for keyword '{kw}'. Skipping keyword.")

        sleep_duration = random.uniform(5, 10)
        print(f"Pausing for {sleep_duration:.1f}s before next keyword...")
        time.sleep(sleep_duration)

    if dfs:
        df_combo = pd.concat(dfs, axis=1)
    else:
        df_combo = pd.DataFrame()
        print("Warning: No keyword data successfully retrieved.")

    return df_combo

def prep_trends_data(df):
    # df['st_composite_sum'] = 0
    df['st_composite_sum'] = (df.sum(axis=1))

    df['st_composite_factor_100'] = ((
        (df['st_composite_sum'] ) /
        (df['st_composite_sum'].sum() )
    ) * 100)
    df.loc[df['st_composite_factor_100'] > 5, 'st_composite_factor_100'] = 5
    df['st_composite_factor_100'] = df['st_composite_factor_100'].fillna(0)
    df = df.reset_index()
    # df.rename(columns={'geoName': 'region'}, inplace=True)
    df['region'] = df['region'].str.upper()
    return df

def add_us_stats_and_geos(gdf, df_trends, df_stats, paid_df, gdf_geo_col='NAME', trends_geo_col='region', stats_geo_col='STATE_NAME', paid_geo_col='state', counts_col=None):
    if counts_col is None:
        return "Error: counts_col must be provided."
    gdf[gdf_geo_col] = gdf[gdf_geo_col].apply(normalize_text)
    df_trends[trends_geo_col] = df_trends[trends_geo_col].apply(normalize_text)
    df_stats[stats_geo_col] = df_stats[stats_geo_col].apply(normalize_text)
    paid_df[paid_geo_col] = paid_df[paid_geo_col].apply(normalize_text)
    merged_df = gdf.merge(df_trends, left_on=gdf_geo_col, right_on=trends_geo_col, how='left')
    merged_df = pd.merge(merged_df, df_stats, left_on=gdf_geo_col, right_on=stats_geo_col, how='left', suffixes=('', '_extra'))
    for col in merged_df.columns:
        if col.endswith('_extra'):
            merged_df.drop(columns=[col], inplace=True)

    merged_df['Ops_below_250k'] = merged_df['Ops_below_250k'].fillna(0).round(0).astype(int)
    merged_df['Ops_250k_or_more'] = merged_df['Ops_250k_or_more'].fillna(0).round(0).astype(int)
    merged_df['Total_Ops'] = merged_df['Total_Ops'].fillna(0).round(0).astype(int)
    comp_col_label = f"{counts_col}_composite_factor_100"
    merged_df[comp_col_label] = (merged_df[counts_col] / merged_df[counts_col].sum()) * 100
    # merged_df['Ops_below_250k_composite_factor_100'] = (merged_df['Ops_below_250k'] / merged_df['Ops_250k_or_more'].sum()) * 100
    merged_df2 = merged_df.merge(paid_df, left_on=gdf_geo_col, right_on=paid_geo_col, how='left').copy().sort_values(gdf_geo_col).reset_index(drop=True)
    # merged_df2 = merged_df2.loc[~merged_df2[paid_geo_col].isna()].copy().sort_values(gdf_geo_col).reset_index(drop=True)


    mean_trends = merged_df2['st_composite_factor_100'].fillna(0).mean()
    mean_volume = merged_df2['paid_search_composite_factor_100'].fillna(0).mean()

    # Clearly calculate relative positions
    merged_df2['trends_relative'] = merged_df2['st_composite_factor_100'] / mean_trends
    merged_df2['volume_relative'] = merged_df2['paid_search_composite_factor_100'] / mean_volume

    # Clearly combine both into single adjustment factor
    merged_df2['combined_relative_factor'] = (merged_df2['trends_relative'] + merged_df2['volume_relative']) / 2

    # Adjusted audience clearly calculated
    audience_label = f"adjusted_audience_{counts_col}"
    merged_df2[audience_label] = (merged_df2[counts_col] * merged_df2['combined_relative_factor']).fillna(0).round(0).astype(int)
    return merged_df2

def reposition_alaska_hawaii(gdf):
    # Separate states
    contiguous_us = gdf[~gdf['NAME'].isin(['ALASKA', 'HAWAII'])]
    alaska = gdf[gdf['NAME'] == 'ALASKA'].copy()
    hawaii = gdf[gdf['NAME'] == 'HAWAII'].copy()
    # Adjusted scale and translate for Alaska and Hawaii
    alaska.geometry = alaska.scale(xfact=0.4, yfact=0.4, origin='center').translate(xoff=1500000, yoff=-4400000)
    hawaii.geometry = hawaii.scale(xfact=0.7, yfact=0.7, origin='center').translate(xoff=5200000, yoff=-1700000)
    # Combine adjusted geometries
    repositioned_us = pd.concat([contiguous_us, alaska, hawaii])
    # and remove all other islands like puerto rico
    repositioned_us = repositioned_us[~(repositioned_us['STATEFP'] >= '60')].sort_values('NAME').reset_index(drop=True)
    return repositioned_us

def plot_us_map(gdf, _density_cmap, _column, _legend_kwds, _title, _footer, _vmax=50000, projection=None):
    if projection is None:
        fig, ax = plt.subplots(figsize=(10, 5))
        # ax.set_aspect('equal')
    else:
        fig, ax = plt.subplots(figsize=(10, 5), subplot_kw={'projection': projection})
    # fig, ax = plt.subplots(figsize=(10, 5))
    gdf.plot(ax=ax, column=_column, cmap=density_cmap, legend=True, vmin=0, vmax=_vmax,
                legend_kwds=_legend_kwds)
    ax.axis('off')
    ax.set_title(_title, fontsize=14, pad=20, horizontalalignment= 'center') # Increased padding
    fig.subplots_adjust(bottom=0.1) # Adjust the bottom margin

    # Add footer
    fig.text(0.5, 0.01, _footer, ha='center', fontsize=10)

    plt.show()

def fetch_google_ads_geo_targets(client, country_codes=['US', 'CA']):
    ga_service = client.get_service("GoogleAdsService")

    country_codes_formatted = ', '.join(f"'{code}'" for code in country_codes)

    query = f"""
        SELECT
            geo_target_constant.resource_name,
            geo_target_constant.id,
            geo_target_constant.name,
            geo_target_constant.country_code,
            geo_target_constant.target_type
        FROM geo_target_constant
        WHERE geo_target_constant.country_code IN ({country_codes_formatted})
        AND geo_target_constant.target_type IN ('State', 'Province', 'Country')
    """

    response = ga_service.search_stream(customer_id=client.login_customer_id, query=query)

    geo_targets = []
    for batch in response:
        for row in batch.results:
            geo_targets.append({
                'resource_name': row.geo_target_constant.resource_name,
                'id': row.geo_target_constant.id,
                'name': row.geo_target_constant.name,
                'country_code': row.geo_target_constant.country_code,
                'target_type': row.geo_target_constant.target_type,
            })

    return pd.DataFrame(geo_targets)

def load_or_download_csv(local_filename, data_url):
    """
    Loads a CSV file locally or downloads it if not available locally.
    Handles CSV files compressed in ZIP, GZIP, or uncompressed formats.

    Args:
        local_filename (str): Path to the local CSV file.
        data_url (str): URL to download the CSV from if not available locally.

    Returns:
        pd.DataFrame: Loaded dataframe from CSV.
    """
    if os.path.exists(local_filename):
        try:
            df = pd.read_csv(local_filename)
            print(f"Loaded data successfully from {local_filename}.")
            return df
        except Exception as e:
            print(f"Error loading local file, will attempt download: {e}")

    print(f"Downloading data from {data_url}...")
    response = requests.get(data_url, stream=True)
    response.raise_for_status()

    content_type = response.headers.get('Content-Type', '').lower()

    # Handle ZIP compressed files
    if 'zip' in content_type or data_url.lower().endswith('.zip'):
        with zipfile.ZipFile(BytesIO(response.content)) as z:
            csv_files = [name for name in z.namelist() if name.endswith('.csv')]
            if not csv_files:
                raise ValueError("No CSV file found in ZIP archive.")
            with z.open(csv_files[0]) as f:
                df = pd.read_csv(f)

    # Handle GZIP compressed files
    elif 'gzip' in content_type or data_url.lower().endswith('.gz'):
        with gzip.open(BytesIO(response.content), 'rt') as f:
            df = pd.read_csv(f)

    # Handle regular CSV files
    else:
        df = pd.read_csv(BytesIO(response.content))

    # Save locally
    df.to_csv(local_filename, index=False)
    print(f"Data downloaded and saved locally to {local_filename}.")

    return df


# Clearly define Canadian sales categories explicitly
below_250k_categories_canada = [
    '$0',
    '$1 to $9,999',
    '$10,000 to $24,999',
    '$25,000 to $49,999',
    '$50,000 to $99,999',
    '$100,000 to $249,999'
]

above_250k_categories_canada = [
    '$250,000 to $499,999',
    '$500,000 to $999,999',
    '$1,000,000 to $1,999,999',
    '$2,000,000 and over'
]

# Function to categorize revenues explicitly
def categorize_revenues(df, geo_level, geo_col, country_geo_val, above_250k_cats, below_250k_cats, country, pivot_col='Total farm revenues distribution'):
    if country == 'CA':
        df_filtered = df.loc[df[geo_col] != country_geo_val] if geo_level != 'national' else df.loc[df[geo_col] == country_geo_val]
        df_filtered = df[(df[geo_col] != country_geo_val) & ~(df[geo_col].str.contains(','))] if geo_level == 'provincial' else df_filtered
        # Example assuming your dataframe is df:
        summary_df = df_filtered.pivot_table(
            index=geo_col,
            columns=pivot_col,
            values='VALUE',
            aggfunc='sum',
            fill_value=0
        ).reset_index()

        # Explicit aggregation for below and above $250k
        summary_df['Ops_below_250k'] = summary_df[below_250k_cats].sum(axis=1)
        summary_df['Ops_250k_or_more'] = summary_df[above_250k_cats].sum(axis=1)
        summary_df['geo_code'] = summary_df[geo_col].apply(lambda x: re.findall(r'\[(.*?)\]', x)[0])
        summary_df['geo_name'] = summary_df[geo_col].apply(lambda x: x.split('[')[0].strip()).str.upper()

        summary_df['Total_Ops'] = summary_df['Ops_below_250k'] + summary_df['Ops_250k_or_more']
        summary_df['PRUID'] = (
            summary_df['geo_code']
            .str.replace('PR', '', regex=False)  # clearly remove 'PR' if present
            .str[:2]                             # clearly take the first two characters
            .astype(int)                         # clearly convert to integer
        )
        final_columns = ['PRUID','geo_name', 'geo_code', 'Ops_below_250k', 'Ops_250k_or_more', 'Total_Ops']
        summary_df = summary_df[final_columns]
        summary_df.columns = final_columns
        summary_df = summary_df.copy().sort_values('PRUID').reset_index(drop=True)
        return summary_df[final_columns].reset_index(drop=True)

    else:
        print('not configured for US yet')

def load_or_download_csv(local_filename, data_url):
    """
    Loads a CSV file locally or downloads it if not available locally.
    Handles CSV files compressed in ZIP, GZIP, or uncompressed formats.

    Args:
        local_filename (str): Path to the local CSV file.
        data_url (str): URL to download the CSV from if not available locally.

    Returns:
        pd.DataFrame: Loaded dataframe from CSV.
    """
    if os.path.exists(local_filename):
        try:
            df = pd.read_csv(local_filename)
            print(f"Loaded data successfully from {local_filename}.")
            return df
        except Exception as e:
            print(f"Error loading local file, will attempt download: {e}")

    print(f"Downloading data from {data_url}...")
    response = requests.get(data_url, stream=True)
    response.raise_for_status()

    content_type = response.headers.get('Content-Type', '').lower()

    # Handle ZIP compressed files
    if 'zip' in content_type or data_url.lower().endswith('.zip'):
        with zipfile.ZipFile(BytesIO(response.content)) as z:
            csv_files = [name for name in z.namelist() if name.endswith('.csv')]
            if not csv_files:
                raise ValueError("No CSV file found in ZIP archive.")
            with z.open(csv_files[0]) as f:
                df = pd.read_csv(f)

    # Handle GZIP compressed files
    elif 'gzip' in content_type or data_url.lower().endswith('.gz'):
        with gzip.open(BytesIO(response.content), 'rt') as f:
            df = pd.read_csv(f)

    # Handle regular CSV files
    else:
        df = pd.read_csv(BytesIO(response.content))

    # Save locally
    df.to_csv(local_filename, index=False)
    print(f"Data downloaded and saved locally to {local_filename}.")

    return df

def simplify_geom(geom, tolerance=0.1):
    return geom.simplify(tolerance, preserve_topology=True)



In [None]:
# @title Load US and Canada State / Province shapefiles

# Remove GDAL's GeoJSON size limit explicitly
os.environ['OGR_GEOJSON_MAX_OBJ_SIZE'] = '0'

try:
    gdf_us_states = gpd.read_file('simplified_us_state_geos.gpkg')
except Exception as e:
    print("File not found. Attempting to fetch and load shapefile.")

    geo_address = 'https://www2.census.gov/geo/tiger/TIGER2023/STATE/tl_2023_us_state.zip'

    gdf_us_states = (fetch_unzip_load_shapefile_flexible(geo_address, 'state_tiger').to_crs('EPSG:5070'))
# gdf_us_states = gdf_us_states.simplify(
#         tolerance=0.05,  # Adjust this if needed; higher = more simplified
#         preserve_topology=True
#     )
# gdf_us_states = gdf_us_states.simplify()

# https://www2.census.gov/geo/tiger/TIGER2024/ESTATE/tl_2024_78_estate.zip

# Load geographic shapes for US/Canada
# gdf_us_states = gpd.read_file('/content/us_state/tl_2024_us_state.shp').to_crs('EPSG:5070')
    gdf_us_states_ref = gdf_us_states[['STATEFP', 'NAME']].drop_duplicates().sort_values('STATEFP').reset_index(drop=True)
    gdf_us_states_ref['NAME'] = gdf_us_states_ref['NAME'].str.upper()
    gdf_us_states['NAME'] = gdf_us_states['NAME'].str.upper()

# save this ref to a gcs bucket
    gdf_us_states_ref.to_csv('us_state_ref.csv', index=False)
    with mp.Pool(mp.cpu_count()) as pool:
        simplified_geometries = pool.map(simplify_geom, gdf_us_states.geometry)
    gdf_us_states['geometry'] = simplified_geometries
    gdf_us_states.to_file('simplified_us_state_geos.gpkg', driver='GPKG')



repositioned_us = reposition_alaska_hawaii(gdf_us_states).sort_values('STATEFP').reset_index(drop=True)

# Example usage:


try:
    canada_provinces_gdf = gpd.read_file('simplified_canada_province_geos.gpkg')
except Exception as e:
    print("File not found. Attempting to fetch and load shapefile.")
    url = "https://www12.statcan.gc.ca/census-recensement/2021/geo/sip-pis/boundary-limites/files-fichiers/lpr_000b21a_e.zip?st=wk4IrLBG"
    output_directory = '/content/province-shapes/'
    canada_provinces_gdf = fetch_unzip_load_shapefile_flexible(url, output_directory)
# canada_provinces_gdf = canada_provinces_gdf.simplify(
#         tolerance=0.05,  # Adjust this if needed; higher = more simplified
#         preserve_topology=True
#     )

    if canada_provinces_gdf is not None:
        canada_provinces_gdf['PRENAME'] = canada_provinces_gdf['PRENAME'].str.upper()
        canada_provinces_gdf = canada_provinces_gdf.to_crs('EPSG:5070')
        print("\nGeoDataFrame loaded:")
        print(canada_provinces_gdf.head())
        # # Adjust the tolerance to balance simplification and detail retention
        # simplification_tolerance = 0.05  # Example tolerance (increase to simplify more aggressively)
        # # Simplify geometries explicitly
        # canada_provinces_gdf['geometry'] = canada_provinces_gdf['geometry'].simplify(
        #     tolerance=simplification_tolerance, preserve_topology=True
        # )
        # # Ensure geometries remain valid
        # canada_provinces_gdf = canada_provinces_gdf[canada_provinces_gdf.is_valid]
        with mp.Pool(mp.cpu_count()) as pool:
            simplified_geometries = pool.map(simplify_geom, canada_provinces_gdf.geometry)

        canada_provinces_gdf['geometry'] = simplified_geometries
        canada_provinces_gdf = canada_provinces_gdf.copy().sort_values('PRUID').reset_index(drop=True)
        canada_provinces_gdf.to_file('simplified_canada_province_geos.gpkg', driver='GPKG')
    else:
        print("No valid GeoDataFrame loaded.")




In [None]:
import requests


params = {
  "engine": "google_trends",
  "q": "John Deere repair",
  "data_type": "GEO_MAP",
  "tz": 360,
  "geo": "US",
  "resolution": "REGION",
  "timeframe": "today 12-m",
  "inc_low_vol": True,
  "region": "REGION",
  "api_key": "iHtMbpxNLN8j18mG5LyMHL8D"
}

response = requests.get(url, params=params)
# print(response.text)

In [None]:

def safe_extract(x, item='value'):
    try:
        val = x[0][item]
        return val.strip() if isinstance(val, str) else val
    except (IndexError, KeyError, TypeError, AttributeError):
        return None

search_api_io_key = userdata.get('SEARCH_API_IO_KEY')
def load_trends_from_search_api(kw_list, api_key, geo='US'):
    dfs = []
    kw_list = kw_list if isinstance(kw_list, list) else [kw_list]

    for kw in kw_list:
        params = {
            "engine": "google_trends",
            "q": kw,
            "data_type": "GEO_MAP",
            "tz": 360,
            "geo": geo,
            "resolution": "REGION",
            "timeframe": "today 12-m",
            "inc_low_vol": True,
            "region": "REGION",
            "api_key": api_key
        }

        response = requests.get(url, params=params)
        interest_raw = response.json().get('interest_by_region', [])

        df = pd.DataFrame(interest_raw)
        if df.empty:
            print(f"No data found for keyword: {kw}")
            continue
        df['region'] = df['name'].str.strip().str.upper()

        val_col_name = f"{kw.replace(' ', '_').lower()}"
        df[val_col_name] = df['values'].apply(safe_extract, item='extracted_value')
        df[val_col_name] = df[val_col_name].fillna(0).astype(int)

        dfs.append(df[['region', val_col_name]])

    # Concatenate and explicitly aggregate by region
    combo_df = pd.concat(dfs)
    combo_df = combo_df.groupby('region').sum()

    return combo_df





In [None]:
# @title Load USDA and Canada Agriculture census data
# URL to USDA Census of Agriculture 2022 data (corrected link)
data_url = 'https://www.nass.usda.gov/datasets/qs.census2022.txt.gz'

# Download the GZIP file
local_filename = 'us_agriculture.csv'
try:
    df = pd.read_csv(local_filename)
    print("Data loaded successfully!")
except Exception as e:
    print("Error loading data: - downloading", e)
    response = requests.get(data_url)
    response.raise_for_status()
    # Decompress and load the file into a pandas DataFrame
    with gzip.open(BytesIO(response.content), 'rt') as f:
        df = pd.read_csv(f, delimiter='\t')  # adjust delimiter if necessary
        df.to_csv(local_filename, index=False)
df_us = df.copy()

data_url = base_url = "https://www150.statcan.gc.ca/n1/en/tbl/csv/32100239-eng.zip?st=wk4IrLBG"
local_filename = 'ca_agriculture.csv'
df_canada = load_or_download_csv(local_filename, data_url)
# df_canada = df_canada.rename(columns={'Total farm revenues distribution': 'revenue_distribution'})
# df_canada['Total farm revenues distribution'].unique()


geo_level='provincial'
provincial_summary_canada = categorize_revenues(df_canada, geo_level=geo_level, geo_col='GEO', country_geo_val='Canada [000000000]', above_250k_cats=above_250k_categories_canada, below_250k_cats=below_250k_categories_canada, country='CA')

print('/nn')
print(provincial_summary_canada.head())



In [None]:
# Your service account data loaded into a dictionary
# ca_tractor_census = 'https://www150.statcan.gc.ca/t1/tbl1/en/dtl!downloadDbLoadingData-nonTraduit.action?pid=3210022901&latestN=5&startDate=&endDate=&csvLocale=en&selectedMembers=%5B%5B1%5D%2C%5B%5D%5D&checkedLevels=1D1'



In [None]:

# us_diy_over_250k config
us_over_250k_keywords = ['John Deere parts', 'John Deere repair', 'John Deere tractor parts', 'John Deere service manual', 'john deere fuel pump replacement', 'john deere part']

google_geos = fetch_google_ads_geo_targets(client)

# google_geos = pd.read_csv('/content/geotargets-2025-04-01.csv')
google_us_states = google_geos[((google_geos['country_code'] == 'US') & (google_geos['target_type'] == 'State'))].copy().sort_values('name').reset_index(drop=True)
google_us_states = google_us_states[['name', 'id']].sort_values('name').reset_index(drop=True)
google_us_states['name'] = google_us_states['name'].str.upper()
# google_us_states.rename(columns={'Name': 'NAME', 'Criteria ID': 'geo_id'}, inplace=True)
us_states = google_us_states.set_index('name')['id'].to_dict()
google_ca_provinces = google_geos[((google_geos['country_code'] == 'CA') & (google_geos['target_type'] == 'Province'))].copy().sort_values('name').reset_index(drop=True)
google_ca_provinces['name'] = google_ca_provinces['name'].str.upper()
google_ca_provinces = google_ca_provinces[['name', 'id']].sort_values('name').reset_index(drop=True)
ca_provinces = google_ca_provinces.set_index('name')['id'].to_dict()


# df_estimates = get_keyword_estimates(client, keywords, us_states)
ca_over_250k_keywords = [
    'John Deere parts',
    'John Deere repair',
    'John Deere tractor parts',
    'John Deere service manual',
    'John Deere fuel pump replacement',
    'John Deere combine parts',  # Important for grain-growing prairies
    'John Deere baler parts',    # Hay operations prevalent in dairy/livestock regions
    'John Deere planter parts',  # Common in row-crop regions (e.g., corn, soybeans)
    'John Deere sprayer repair',  # Important across grain and specialty crop operations
    'pièces John Deere',
    'réparation John Deere',
    'manuel de service John Deere',
    'pièces tracteur John Deere',
    'pièces moissonneuse-batteuse John Deere'
]








In [None]:

# county_df = df.dropna(subset=['COUNTY_CODE', 'STATE_FIPS_CODE']).copy().reset_index(drop=True)
# county_df['COUNTY_FIPS'] = county_df['STATE_FIPS_CODE'].astype(str).str.zfill(2) + \
#                            county_df['COUNTY_CODE'].astype(int).astype(str).str.zfill(3)
operator_field_to_use = 'COMMODITY TOTALS - OPERATIONS WITH SALES'

In [None]:
# summarize usda farm operations data
state_df = None
county_df = None
national_df = None
state_summary = None
county_summary = None
national_summary = None

below_250k_categories = [
    'FARM SALES: (LESS THAN 1,000 $)',
    'FARM SALES: (1,000 TO 2,499 $)',
    'FARM SALES: (2,500 TO 4,999 $)',
    'FARM SALES: (5,000 TO 9,999 $)',
    'FARM SALES: (10,000 TO 24,999 $)',
    'FARM SALES: (25,000 TO 49,999 $)',
    'FARM SALES: (50,000 TO 99,999 $)',
    'FARM SALES: (100,000 TO 249,999 $)'
]

above_250k_categories = [
    'FARM SALES: (250,000 TO 499,999 $)',
    'FARM SALES: (500,000 TO 999,999 $)',
    'FARM SALES: (1,000,000 OR MORE $)'
]

state_df = df_us[
    (df_us['SHORT_DESC'] == operator_field_to_use) &
    (df_us['AGG_LEVEL_DESC'] == 'STATE') &
    (df_us['YEAR'] == 2022)
].copy()


state_df['VALUE_NUMERIC'] = pd.to_numeric(state_df['VALUE'].str.replace(',', ''), errors='coerce')
state_df.columns.to_list()
# Aggregate clearly by state
state_df['STATEFP'] = state_df['STATE_FIPS_CODE'].astype(int).astype(str).str.zfill(2)
state_summary = state_df.groupby(['STATEFP','STATE_NAME']).apply(lambda x: pd.Series({
    'Ops_below_250k': x[x['DOMAINCAT_DESC'].isin(below_250k_categories)]['VALUE_NUMERIC'].sum(),
    'Ops_250k_or_more': x[x['DOMAINCAT_DESC'].isin(above_250k_categories)]['VALUE_NUMERIC'].sum()
}), include_groups=False).reset_index()

# Add total clearly
state_summary['Total_Ops'] = state_summary['Ops_below_250k'] + state_summary['Ops_250k_or_more']

print(state_summary.head(10))
county_df = df_us[
    (df_us['SHORT_DESC'] == operator_field_to_use) &
    (df_us['AGG_LEVEL_DESC'] == 'COUNTY') &
    (df_us['YEAR'] == 2022)
].copy()

county_df['VALUE_NUMERIC'] = pd.to_numeric(county_df['VALUE'].str.replace(',', ''), errors='coerce')

# Construct County FIPS explicitly
try:
    county_df['FIPS'] = county_df['STATE_FIPS_CODE'].astype(str).str.zfill(2) + \
                        county_df['COUNTY_CODE'].astype(int).astype(str).str.zfill(3)
except ValueError:
    print(ValueError)

county_summary = county_df.groupby(['STATE_NAME', 'COUNTY_NAME', 'FIPS']).apply(lambda x: pd.Series({
    'Ops_below_250k': x[x['DOMAINCAT_DESC'].isin(below_250k_categories)]['VALUE_NUMERIC'].sum(),
    'Ops_250k_or_more': x[x['DOMAINCAT_DESC'].isin(above_250k_categories)]['VALUE_NUMERIC'].sum()
}), include_groups=False).reset_index()

# Calculate total explicitly
county_summary['Total_Ops'] = county_summary['Ops_below_250k'] + county_summary['Ops_250k_or_more']

print(county_summary.head(10))
national_summary = df_us[
    (df_us['SHORT_DESC'] == operator_field_to_use) &
    (df_us['AGG_LEVEL_DESC'] == 'NATIONAL') &
    (df_us['YEAR'] == 2022)
].copy()

national_summary['VALUE_NUMERIC'] = pd.to_numeric(national_summary['VALUE'].str.replace(',', ''), errors='coerce')

national_below_250k = national_summary[
    national_summary['DOMAINCAT_DESC'].isin(below_250k_categories)
]['VALUE_NUMERIC'].sum()

national_above_250k = national_summary[
    national_summary['DOMAINCAT_DESC'].isin(above_250k_categories)
]['VALUE_NUMERIC'].sum()

total_national_ops = national_below_250k + national_above_250k

print(f"National Operations < $250k: {int(national_below_250k):,}")
print(f"National Operations ≥ $250k: {int(national_above_250k):,}")
print(f"National Total Operations: {int(total_national_ops):,}")

In [None]:
us_large_search_trends = load_trends_from_search_api(us_over_250k_keywords, search_api_io_key)

us_large_paid = get_keyword_estimates(client, us_over_250k_keywords, us_states)
ca_search_trends = load_trends_from_search_api(ca_over_250k_keywords, search_api_io_key, geo='CA')
ca_large_paid = get_keyword_estimates(client, ca_over_250k_keywords, ca_provinces)

In [None]:




density_colors = ['#E0EDF7', '#5E96AE', '#237D83', '#1A4599', '#F36D32']

density_cmap = LinearSegmentedColormap.from_list('audience', density_colors)
data_col='adjusted_audience_Ops_250k_or_more'
leg_kwds_dict={'label': "Estimated Audience", 'orientation': "vertical", 'shrink': 0.7}
title_txt = f'Audience Sizing for DIY Interest: US Farm Operations > $250k'
footer_txt = 'Data Sources: USDA, Google Ads API, Google Trends (Data normalized individually then summed)'


# us_large_search_trends = get_trends_via_scrapingbee(us_over_250k_keywords, scrapingbee_api_key)

us_large_trends_prepped = prep_trends_data(us_large_search_trends)


us_large_paid_prepped = prep_paid_search_data(us_large_paid)

counts_col='Ops_250k_or_more'
us_large_audiences = add_us_stats_and_geos(repositioned_us, us_large_trends_prepped, state_summary, us_large_paid_prepped, counts_col=counts_col)
us_large_audiences = us_large_audiences.to_crs(epsg=5070)
vert_max = us_large_audiences['adjusted_audience_Ops_250k_or_more'].max()
data_col = 'adjusted_audience_Ops_250k_or_more'
file_stub = 'us_diy_over250k'
leg_kwds_dict={'label': "Estimated Audience", 'orientation': "vertical", 'shrink': 0.7}
title_txt = f'Audience Sizing for DIY Interest: US Farm Operations >= $250k'
footer_txt = 'Data Sources: 2022 USDA Ag Census, Google Ads API, Google Search Trends'

plot_us_map(us_large_audiences, density_cmap, data_col, leg_kwds_dict, title_txt, footer_txt, vert_max)

us_large_audiences[['NAME', 'STATEFP',counts_col,data_col]].to_csv(f'{file_stub}_adjusted_audience_by_state.csv', index=False)



# ca_search_trends['geoName'] = ca_search_trends['geoName'].apply(normalize_text)
ca_large_trends_prepped = prep_trends_data(ca_search_trends)
# ca_large_trends_prepped['region'] = ca_large_trends_prepped['region'].apply(normalize_text)
ca_large_paid_prepped = prep_paid_search_data(ca_large_paid)

ca_large_paid_prepped
counts_col='Ops_250k_or_more'

ca_large_audiences = add_us_stats_and_geos(canada_provinces_gdf, ca_large_trends_prepped, provincial_summary_canada, ca_large_paid_prepped, counts_col=counts_col, gdf_geo_col='PRENAME',
                                           stats_geo_col='geo_name').copy()

canada_provinces_gdf.columns
ca_large_audiences = ca_large_audiences.to_crs(epsg=5070)
vert_max = ca_large_audiences['adjusted_audience_Ops_250k_or_more'].max()
data_col = 'adjusted_audience_Ops_250k_or_more'
file_stub = 'ca_diy_over250k'
leg_kwds_dict={'label': "Estimated Audience", 'orientation': "vertical", 'shrink': 0.7}
title_txt = f'Audience Sizing for DIY Interest: CA Farm Operations >= $250k'
footer_txt = 'Data Sources: 2021 CA Ag Census, Google Ads API, Google Search Trends'

plot_us_map(ca_large_audiences, density_cmap, data_col, leg_kwds_dict, title_txt, footer_txt, vert_max)
ca_large_audiences[['PRENAME', 'PRUID',counts_col,data_col]].to_csv(f'{file_stub}_adjusted_audience_by_state.csv', index=False)

na_large_audiences = pd.concat([us_large_audiences[['adjusted_audience_Ops_250k_or_more', 'geometry']], ca_large_audiences[['adjusted_audience_Ops_250k_or_more', 'geometry']]])
vert_max = na_large_audiences['adjusted_audience_Ops_250k_or_more'].max()
ile_stub = 'na_diy_over250k'
leg_kwds_dict={'label': "Estimated Audience", 'orientation': "vertical", 'shrink': 0.7}
title_txt = f'Audience Sizing for DIY Interest: NA Farm Operations >= $250k'
footer_txt = 'Data Sources: 2022 USDA Ag Census, 2021 CA Ag Census, Google Ads API, Google Search Trends'
plot_us_map(na_large_audiences, density_cmap, data_col, leg_kwds_dict, title_txt, footer_txt, vert_max)

In [None]:
#  start on small operators
us_under_250k_diy_keywords = [
    'John Deere mower repair',
    'John Deere lawn tractor parts',
    'John Deere mower maintenance',
    'small tractor repair services',
    'small tractor troubleshooting',
    'john deere parts'
]
ca_under_250k_diy_keywords = [
    'John Deere mower repair',
    'John Deere lawn tractor parts',
    'John Deere mower maintenance',
    'small tractor repair services',
    'small tractor troubleshooting',
    'John Deere parts',
    'John Deere snow blower parts',       # Explicitly relevant due to snow conditions
    'John Deere snow blower repair',      # Frequent seasonal repairs
    'garden tractor snow blade',          # Attachments specifically useful in Canada
    'compact tractor winter maintenance',  # Maintenance specific to colder climates
    'réparation tondeuse John Deere',
    'pièces tracteur pelouse John Deere'
]
us_small_search_trends = load_trends_from_search_api(us_under_250k_diy_keywords, search_api_io_key)
us_small_paid = get_keyword_estimates(client, us_under_250k_diy_keywords, us_states)
ca_small_search_trends = load_trends_from_search_api(ca_under_250k_diy_keywords, search_api_io_key, geo='CA')
ca_small_paid = get_keyword_estimates(client, ca_under_250k_diy_keywords, ca_provinces)

In [None]:


us_small_op_trends_prepped = prep_trends_data(us_small_search_trends)
us_small_op_trends_prepped
us_small_op_paid_prepped = prep_paid_search_data(us_small_paid)
us_small_op_paid_prepped
counts_col='Ops_below_250k'
us_small_op_audiences = add_us_stats_and_geos(repositioned_us, us_small_op_trends_prepped, state_summary, us_small_op_paid_prepped, counts_col=counts_col)
sus_mall_op_audiences = us_small_op_audiences.to_crs(epsg=5070)
us_small_op_audiences.loc[us_small_op_audiences['NAME'] == 'PUERTO RICO']
vert_max = us_small_op_audiences['adjusted_audience_Ops_below_250k'].max()
data_col = 'adjusted_audience_Ops_below_250k'
file_stub = 'us_diy_under250k'
leg_kwds_dict={'label': "Estimated Audience", 'orientation': "vertical", 'shrink': 0.7}
title_txt = f'Audience Sizing for DIY Interest: US Farm Operations < $250k'
footer_txt = 'Data Sources: USDA, Google Ads API, Google Trends (Data normalized individually then summed)'

plot_us_map(us_small_op_audiences, density_cmap, data_col, leg_kwds_dict, title_txt, footer_txt, vert_max)
us_small_op_audiences[['NAME', 'STATEFP',counts_col,data_col]].to_csv(f'{file_stub}_adjusted_audience_by_state.csv', index=False)



ca_small_op_trends_prepped = prep_trends_data(ca_small_search_trends)
ca_small_op_trends_prepped
ca_small_op_paid_prepped = prep_paid_search_data(ca_small_paid)
ca_small_op_paid_prepped
counts_col='Ops_below_250k'
ca_small_op_audiences = add_us_stats_and_geos(canada_provinces_gdf, ca_small_op_trends_prepped, provincial_summary_canada, ca_small_op_paid_prepped, counts_col=counts_col, gdf_geo_col='PRENAME',
                                           stats_geo_col='geo_name')
ca_small_op_audiences = ca_small_op_audiences.to_crs(epsg=5070)
vert_max = ca_small_op_audiences['adjusted_audience_Ops_below_250k'].max()
data_col = 'adjusted_audience_Ops_below_250k'
file_stub = 'ca_diy_under250k'
leg_kwds_dict={'label': "Estimated Audience", 'orientation': "vertical", 'shrink': 0.7}
title_txt = f'Audience Sizing for DIY Interest: US Farm Operations < $250k'
footer_txt = 'Data Sources: USDA, Google Ads API, Google Trends (Data normalized individually then summed)'

plot_us_map(ca_small_op_audiences, density_cmap, data_col, leg_kwds_dict, title_txt, footer_txt, vert_max)
ca_small_op_audiences[['PRENAME', 'PRUID',counts_col,data_col]].to_csv(f'{file_stub}_adjusted_audience_by_state.csv', index=False)


na_small_audiences = pd.concat([us_small_op_audiences[['adjusted_audience_Ops_below_250k', 'geometry']], ca_small_op_audiences[['adjusted_audience_Ops_below_250k', 'geometry']]])
vert_max = na_small_audiences['adjusted_audience_Ops_below_250k'].max()
file_stub = 'na_diy_below_250k'
leg_kwds_dict={'label': "Estimated Audience", 'orientation': "vertical", 'shrink': 0.7}
title_txt = f'Audience Sizing for DIY Interest: NA Farm Operations  $250k'
footer_txt = 'Data Sources: 2022 USDA Ag Census, 2021 CA Ag Census, Google Ads API, Google Search Trends'
plot_us_map(na_small_audiences, density_cmap, data_col, leg_kwds_dict, title_txt, footer_txt, vert_max)
