In [9]:
from google.colab import drive
import json
import yaml
import requests
import os
import pandas as pd
import logging
from logging.handlers import RotatingFileHandler
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Setup logging
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_file = '/content/drive/MyDrive/Intern Task/data_collection.log'

# Create a custom logger
logger = logging.getLogger(__name__)

# Create handlers
file_handler = RotatingFileHandler(log_file, maxBytes=5*1024*1024, backupCount=2)
file_handler.setFormatter(log_formatter)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(log_formatter)

# Set level for handlers
file_handler.setLevel(logging.INFO)
stream_handler.setLevel(logging.INFO)

# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)

# Set the logging level for the logger
logger.setLevel(logging.INFO)

# Test logging configuration
logger.info("Logging setup complete. This is a test log message.")

# Mount Google Drive to access files
logger.info("Mounting Google Drive")
try:
    drive.mount('/content/drive')
    logger.info("Google Drive mounted successfully.")
except Exception as e:
    logger.error("Failed to mount Google Drive: %s", e)
    raise

# Load the configuration file from the specified path
config_file_path = '/content/drive/MyDrive/Intern Task/config.yaml'
logger.info("Loading Configuration File")
try:
    with open(config_file_path, 'r') as config_file:
        config = yaml.safe_load(config_file)
    logger.info("Configuration file loaded successfully.")
except Exception as e:
    logger.error("Failed to load configuration file: %s", e)
    raise

# Setup requests session with retry strategy
session = requests.Session()
retry_strategy = Retry(
    total=3,  # Number of retries
    status_forcelist=[429, 500, 502, 503, 504],  # Retry for these status codes
    allowed_methods=["HEAD", "GET", "OPTIONS"],  # Retry for these methods
    backoff_factor=1  # Backoff factor for retries
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)

# Helper function to make API requests
def make_api_request(base_url, endpoint, params):
    """
    Makes an API request to the given endpoint with the specified parameters.

    Args:
    - base_url (str): The base URL of the API.
    - endpoint (dict): A dictionary containing the endpoint path and other details.
    - params (dict): A dictionary of parameters to be sent in the API request.

    Returns:
    - dict: The JSON response from the API if the request is successful.
    """
    url = f"{base_url}{endpoint['path']}"
    logger.info("Making API Request to %s", url)
    try:
        response = session.get(url, params=params)
        response.raise_for_status()
        logger.info("API request successful for %s", url)
        return response.json()
    except requests.RequestException as e:
        logger.error("API request failed for %s with params %s: %s", url, params, e)
        return {}

# Function to fetch data from USDA ARMS Data API
def fetch_usda_data(config):
    """
    Fetches data from the USDA ARMS Data API based on the configuration.

    Args:
    - config (dict): The configuration dictionary containing API details.

    Returns:
    - dict: A dictionary containing the fetched data for each endpoint.
    """
    data = {}
    base_url = config['data_sources']['USDA_ARMS']['base_url']
    for endpoint_name, endpoint in config['data_sources']['USDA_ARMS']['endpoints'].items():
        params = endpoint['params']
        logger.info("Fetching USDA Data for %s", endpoint_name)
        data[endpoint_name] = make_api_request(base_url, endpoint, params)
    return data

# Function to fetch data from FRED API
def fetch_fred_data(config):
    """
    Fetches data from the FRED API based on the configuration.

    Args:
    - config (dict): The configuration dictionary containing API details.

    Returns:
    - dict: A dictionary containing the fetched data for each endpoint.
    """
    data = {}
    base_url = config['data_sources']['FRED']['base_url']
    for endpoint_name, endpoint in config['data_sources']['FRED']['endpoints'].items():
        params = endpoint['params']
        logger.info("Fetching FRED Data for %s", endpoint_name)
        data[endpoint_name] = make_api_request(base_url, endpoint, params)
    return data

# Function to flatten nested dictionaries
def flatten_dict(d, parent_key='', sep='_'):
    """
    Flattens a nested dictionary.

    Args:
    - d (dict): The dictionary to flatten.
    - parent_key (str): The base key for the flattened dictionary.
    - sep (str): The separator between keys.

    Returns:
    - dict: The flattened dictionary.
    """
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list):
            for i, item in enumerate(v):
                if isinstance(item, dict):
                    items.extend(flatten_dict(item, f"{new_key}_{i}", sep=sep).items())
                else:
                    items.append((f"{new_key}_{i}", item))
        else:
            items.append((new_key, v))
    return dict(items)

# Function to save data to JSON files in Google Drive
def save_data(data, folder):
    """
    Saves data to JSON files in the specified Google Drive folder.

    Args:
    - data (dict): The data to save.
    - folder (str): The folder path in Google Drive.
    """
    drive_folder = f"/content/drive/MyDrive/Intern Task/{folder}"
    if not os.path.exists(drive_folder):
        os.makedirs(drive_folder)
    for endpoint_name, endpoint_data in data.items():
        file_path = os.path.join(drive_folder, f"{endpoint_name}.json")
        logger.info("Saving JSON Data for %s to %s", endpoint_name, file_path)
        try:
            with open(file_path, 'w') as file:
                json.dump(endpoint_data, file, indent=2)
            logger.info("Saved JSON data for %s to %s", endpoint_name, file_path)
        except IOError as e:
            logger.error("Failed to save JSON data for %s to %s: %s", endpoint_name, file_path, e)

# Function to save data to CSV files in Google Drive
def save_data_csv(data, folder):
    """
    Saves data to CSV files in the specified Google Drive folder.

    Args:
    - data (dict): The data to save.
    - folder (str): The folder path in Google Drive.
    """
    drive_folder = f"/content/drive/MyDrive/Intern Task/{folder}"
    if not os.path.exists(drive_folder):
        os.makedirs(drive_folder)
    for endpoint_name, endpoint_data in data.items():
        df = pd.DataFrame(endpoint_data)
        file_path = os.path.join(drive_folder, f"{endpoint_name}.csv")
        logger.info("Saving CSV Data for %s to %s", endpoint_name, file_path)
        try:
            df.to_csv(file_path, index=False)
            logger.info("Saved CSV data for %s to %s", endpoint_name, file_path)
        except IOError as e:
            logger.error("Failed to save CSV data for %s to %s: %s", endpoint_name, file_path, e)

# Function to clean the data
def clean_data(df):
    """
    Cleans the DataFrame by handling nested dictionaries, lists, duplicates, and missing values.

    Args:
    - df (pd.DataFrame): The DataFrame to clean.

    Returns:
    - pd.DataFrame: The cleaned DataFrame.
    """
    logger.info("Cleaning Data")
    try:
        for col in df.columns:
            if df[col].apply(type).eq(dict).any():
                df[col] = df[col].apply(lambda x: json.dumps(x) if isinstance(x, dict) else x)
            elif df[col].apply(type).eq(list).any():
                df[col] = df[col].apply(lambda x: json.dumps(x) if isinstance(x, list) else x)

        df.drop_duplicates(inplace=True)
        df.fillna('Unknown', inplace=True)

        logger.info("Data cleaned successfully.")
        return df
    except Exception as e:
        logger.error("Data cleaning failed: %s", e)
        return df

# Function to process and save USDA data
def process_usda_data(config):
    """
    Processes and saves USDA data based on the configuration.

    Args:
    - config (dict): The configuration dictionary containing API details.
    """
    logger.info("Processing USDA Data")
    try:
        data = fetch_usda_data(config)
        for endpoint_name, endpoint_data in data.items():
            df = pd.DataFrame(endpoint_data['data']) if 'data' in endpoint_data else pd.DataFrame(endpoint_data)
            df_cleaned = clean_data(df)
            save_data({endpoint_name: df_cleaned.to_dict(orient='records')}, 'USDA')
            save_data_csv({endpoint_name: df_cleaned.to_dict(orient='records')}, 'USDA')
        logger.info("USDA data processing completed successfully.")
    except Exception as e:
        logger.error("Failed to process USDA data: %s", e)

# Function to process and save FRED data
def process_fred_data(config):
    """
    Processes and saves FRED data based on the configuration.

    Args:
    - config (dict): The configuration dictionary containing API details.
    """
    logger.info("Processing FRED Data")
    try:
        data = fetch_fred_data(config)
        for endpoint_name, endpoint_data in data.items():
            observations = endpoint_data.get('observations', [])
            df = pd.DataFrame(observations)
            df_cleaned = clean_data(df)
            save_data({endpoint_name: df_cleaned.to_dict(orient='records')}, 'FRED')
            save_data_csv({endpoint_name: df_cleaned.to_dict(orient='records')}, 'FRED')
        logger.info("FRED data processing completed successfully.")
    except Exception as e:
        logger.error("Failed to process FRED data: %s", e)

# Process and save USDA data
process_usda_data(config)

# Process and save FRED data
process_fred_data(config)

logger.info("Data Processing Completed")


2024-07-01 06:07:48,371 - INFO - Logging setup complete. This is a test log message.
2024-07-01 06:07:48,371 - INFO - Logging setup complete. This is a test log message.
2024-07-01 06:07:48,371 - INFO - Logging setup complete. This is a test log message.
INFO:__main__:Logging setup complete. This is a test log message.
2024-07-01 06:07:48,385 - INFO - Mounting Google Drive
2024-07-01 06:07:48,385 - INFO - Mounting Google Drive
2024-07-01 06:07:48,385 - INFO - Mounting Google Drive
INFO:__main__:Mounting Google Drive
2024-07-01 06:07:48,929 - INFO - Google Drive mounted successfully.
2024-07-01 06:07:48,929 - INFO - Google Drive mounted successfully.
2024-07-01 06:07:48,929 - INFO - Google Drive mounted successfully.
INFO:__main__:Google Drive mounted successfully.
2024-07-01 06:07:48,936 - INFO - Loading Configuration File
2024-07-01 06:07:48,936 - INFO - Loading Configuration File
2024-07-01 06:07:48,936 - INFO - Loading Configuration File
INFO:__main__:Loading Configuration File
2024

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


2024-07-01 06:07:49,831 - INFO - API request successful for https://api.ers.usda.gov/data/arms/surveydata
2024-07-01 06:07:49,831 - INFO - API request successful for https://api.ers.usda.gov/data/arms/surveydata
2024-07-01 06:07:49,831 - INFO - API request successful for https://api.ers.usda.gov/data/arms/surveydata
INFO:__main__:API request successful for https://api.ers.usda.gov/data/arms/surveydata
2024-07-01 06:07:49,840 - INFO - Fetching USDA Data for yearly_data
2024-07-01 06:07:49,840 - INFO - Fetching USDA Data for yearly_data
2024-07-01 06:07:49,840 - INFO - Fetching USDA Data for yearly_data
INFO:__main__:Fetching USDA Data for yearly_data
2024-07-01 06:07:49,853 - INFO - Making API Request to https://api.ers.usda.gov/data/arms/year
2024-07-01 06:07:49,853 - INFO - Making API Request to https://api.ers.usda.gov/data/arms/year
2024-07-01 06:07:49,853 - INFO - Making API Request to https://api.ers.usda.gov/data/arms/year
INFO:__main__:Making API Request to https://api.ers.usda.