# 1.Making the data Model in My SQL

CREATE DATABASE corteva_database;      

-- Table to store weather data records
CREATE TABLE weather_data (
    id INT AUTO_INCREMENT PRIMARY KEY,
    station_id VARCHAR(50) NOT NULL,           -- The weather_station table (now a VARCHAR)
    date DATE NOT NULL,                        -- Date of the weather record (format: YYYY-MM-DD)
    max_temp DECIMAL(5,2),                     -- In tenths of a degree Celsius
    min_temp DECIMAL(5,2),                     -- In tenths of a degree Celsius
    precipitation DECIMAL(6,2),                -- In tenths of a millimeter

    UNIQUE KEY unique_date_station (date, station_id)    -- Unique Key combination.
);




# 2. Data Ingestion 

In [4]:
import mysql.connector
from mysql.connector import Error
import os
import logging
from datetime import datetime
from tqdm import tqdm

# Configure logging to store logs in 'weather_data_log.log' file
logging.basicConfig(filename='weather_data_log.log',  # Log file path
                    filemode='a',  # Append mode to keep existing logs
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    level=logging.INFO)

# Database configuration details, replace with actual credentials
# This dictionary holds the necessary parameters to connect to MySQL
# Ensure to change these values before deployment to a production environment.
db_config = {
    'host': 'localhost',
    'user': 'root',  
    'password': 'Root@123',
    'database': 'corteva_database'
}

def db_connection(config):
    """A function to establish MySQL database connection."""
    try:
        connection = mysql.connector.connect(**config)  # Attempt to establish a connection
        return connection  # Return connection object if successful
    except mysql.connector.Error as error:
        logging.error(f"Error connecting to the database: {error}")  # Log the error if connection fails
    return None  # Return None in case of failure

def files_processing(file_path, connection):
    """Function to process and insert data from a local file into the MySQL database.

    Args:
        file_path (str): The path to the data file.
        connection: Connection object for the MySQL database.

    Returns:
        int: Total number of records inserted into the database.
    """
    total_records = 0  # Counter to track inserted records
    station_id = os.path.splitext(os.path.basename(file_path))[0]  # Extract station ID from filename
    
    with open(file_path, 'r') as data_file:
        records_to_insert = []  # List to hold processed records before batch insertion
        
        for line in data_file:
            # Read and parse each line from the file
            date, max_temp_tenths, min_temp_tenths, precipitation_tenths = line.strip().split()
            
            # Convert date format from YYYYMMDD to YYYY-MM-DD
            formatted_date = f"{date[:4]}-{date[4:6]}-{date[6:]}"
            
            # Convert values, replacing '-9999' (missing data) with None
            max_temp = None if max_temp_tenths == '-9999' else float(max_temp_tenths)  
            min_temp = None if min_temp_tenths == '-9999' else float(min_temp_tenths) 
            precipitation = None if precipitation_tenths == '-9999' else float(precipitation_tenths) / 10  # Convert tenths of mm to cm
            
            # Append processed record to the list
            records_to_insert.append((station_id , formatted_date, max_temp, min_temp, precipitation ))
    
    if records_to_insert:
        cursor = connection.cursor()  # Create a database cursor
        batch_insert_query = """
        INSERT INTO weather_data (station_id , date, max_temp, min_temp, precipitation )
        VALUES (%s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE 
            max_temp = VALUES(max_temp),
            min_temp = VALUES(min_temp),
            precipitation = VALUES(precipitation);
        """  # Insert new records or update existing ones
        try:
            cursor.executemany(batch_insert_query, records_to_insert)  # Execute batch insertion
            connection.commit()  # Commit changes to the database
            total_records += len(records_to_insert)  # Update record count
        except mysql.connector.Error as error:
            logging.error(f"Error inserting data from file {file_path}: {error}")  # Log any insertion error
        finally:
            cursor.close()  # Close the cursor to release resources
    
    return total_records  # Return total inserted records

def data_ingestion(directory, config):
    """
    Function to ingest data from all files within the local directory.

    Args:
        directory (str): The path to the directory containing data files.
        config: Configuration object for database connection.

    Returns:
        None
    """
    conn = db_connection(config)  # Establish database connection
    if not conn:
        logging.error("Failed to connect to the database.")
        return  # Exit if the database connection fails

    total_inserted = 0  # Track total inserted records
    start_time = datetime.now()  # Capture start time
    logging.info(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
    logging.info("Starting data ingestion process...")
    print(f"Start time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print("Starting data ingestion process...")

    # Get a list of all text files in the specified directory
    files = [f for f in os.listdir(directory) if f.endswith('.txt')]
    total_files = len(files)  # Count total files to process

    # Display progress using tqdm progress bar
    with tqdm(total=total_files, desc="Ingesting Data", unit="file") as pbar:
        for filename in files:
            full_path = os.path.join(directory, filename)  # Construct full file path
            records_inserted = files_processing(full_path, conn)  # Process file and insert data
            total_inserted += records_inserted  # Update total inserted count
            pbar.update(1)  # Update progress bar

    conn.close()  # Close database connection after processing all files
    end_time = datetime.now()  # Capture end time
    duration = end_time - start_time  # Calculate total processing duration

    # Log and print the summary of the ingestion process
    logging.info(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
    logging.info(f"Duration: {duration}")
    logging.info(f"Total records processed: {total_inserted}")
    logging.info("Data ingestion completed.")
    print(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Duration: {duration}")
    print(f"Total records processed: {total_inserted}")
    print("Data ingestion completed.")


In [5]:
if __name__ == "__main__":
    data_directory = 'C:/Users/ariza/OneDrive/Desktop/code-challenge-template-main/wx_data'  # local data directory path
    data_ingestion(data_directory, db_config)

Start time: 2025-02-06 19:56:44
Starting data ingestion process...


Ingesting Data: 100%|██████████████████████████████████████████████████████████████| 167/167 [02:35<00:00,  1.07file/s]

End time: 2025-02-06 19:59:19
Duration: 0:02:35.705451
Total records processed: 1729957
Data ingestion completed.





# 3. Analysis 

CREATE TABLE weather_statistics (
    stat_id INT AUTO_INCREMENT PRIMARY KEY,    -- Stat id to identify the entry
    station_id VARCHAR(50) NOT NULL,           -- The weather_station table (now a VARCHAR)
    year INT NOT NULL,                         -- Year of the weather record YYYY
    avg_max_temp DECIMAL(5, 2),                -- avg_max_temp of the year
    avg_min_temp DECIMAL(5, 2),                -- avg_min_temp of the year
    total_precipitation DECIMAL(6, 2),         -- total_precipitation of the year
    UNIQUE KEY unique_station_year (station_id, year)   -- Unique Key combination.
);

In [None]:
import mysql.connector
from mysql.connector import Error
import os
import logging
from datetime import datetime
from tqdm import tqdm

# Configure logging similar to the provided data_ingestion example
logging.basicConfig(filename='weather_data_log.log',  # Log file path
                    filemode='a',  # Append mode
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    level=logging.INFO)

# Database configuration details, Please replace the host, user, password, database name accordingly.
db_config = {
    'host': 'localhost',
    'user': 'root',  
    'password': 'Root@123',
    'database': 'corteva_database'
}

def db_connection(config):
    """A function to establish MySQL database connection."""
    try:
        connection = mysql.connector.connect(**config)
        return connection
    except mysql.connector.Error as error:
        logging.error(f"Error connecting to the database: {error}")
    return None

def weather_data_statistics(config):
    """
    Function to calculate average temperature (minimum and maximum) and total precipitation from weather_data table
    and ingest it into a new table in the same database, called weather_statistics.
    
    Args:
        config: Configuration object for database connection.
        
    Returns:
        None
    """
    conn = db_connection(config)  # Assuming db_connection is a function that connects to the database and returns a connection object
    if not conn:
        logging.error("Failed to connect to the database.")
        print("Failed to connect to the database.")
        return
    
    start_time = datetime.now()
    logging.info("Starting weather data statistics calculation...")
    print("Starting weather data statistics calculation...")

    try:
        cursor = conn.cursor()
        # Calculate yearly statistics for each station, excluding missing data
        query = """
        SELECT station_id, 
               YEAR(date) as year, 
               AVG(max_temp) as avg_max_temp, 
               AVG(min_temp) as avg_min_temp, 
               SUM(precipitation) as total_precipitation
        FROM weather_data
        WHERE max_temp IS NOT NULL AND 
              min_temp IS NOT NULL AND 
              precipitation IS NOT NULL
        GROUP BY station_id, YEAR(date)
        """
        cursor.execute(query)
        results = cursor.fetchall()

        # Process and insert calculated statistics
        for row in tqdm(results, desc="Updating statistics"):
            station_id, year, avg_max_temp, avg_min_temp, total_precipitation = row
            upsert_query = """
            INSERT INTO weather_statistics (station_id, year, avg_max_temp, avg_min_temp, total_precipitation)
            VALUES (%s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE avg_max_temp=VALUES(avg_max_temp), 
                                    avg_min_temp=VALUES(avg_min_temp),
                                    total_precipitation=VALUES(total_precipitation)
            """
            cursor.execute(upsert_query, (station_id, year, avg_max_temp, avg_min_temp, total_precipitation))
        conn.commit()
    except Error as e:
        logging.error(f"Database error: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()
            logging.info("Database connection closed.")

    end_time = datetime.now()
    duration = end_time - start_time
    logging.info(f"Statistics calculation completed. Start time: {start_time}, End time: {end_time}, Duration: {duration}")
    print(f"Statistics calculation completed. Start time: {start_time}, End time: {end_time}, Duration: {duration}")


In [9]:
if __name__ == "__main__":
    weather_data_statistics(db_config)

Starting weather data statistics calculation...


Updating statistics: 100%|███████████████████████████████████████████████████████| 4791/4791 [00:02<00:00, 2395.38it/s]

Statistics calculation completed. Start time: 2025-02-06 20:05:08.232571, End time: 2025-02-06 20:05:33.032399, Duration: 0:00:24.799828





# 4. API creation and testing

Step : 
1. run : python API_CALL.py 
2. URL : http://127.0.0.1:5000/api/weather
-> http://127.0.0.1:5000/api/weather/stats
Screenshot Attached for the Json output.


# EXTRA

Deploying a complete weather data analysis and API service on AWS involves multiple stages, including database setup, data ingestion, data processing, and exposing the processed data through an API.

Task 1: Data Modeling
For storing relational data on AWS, Amazon RDS with MySQL is a strong choice, as it supports familiar SQL-based data modeling. The database schema remains consistent with previously mentioned structures, making it well-suited for MySQL on Amazon RDS.

Task 2: Data Ingestion
Raw weather data can be initially stored in Amazon S3. To process this data, an AWS Lambda function is utilized. This serverless compute service triggers code execution when data changes occur in an S3 bucket, making it ideal for ingestion tasks. The Lambda function reads raw data files from S3, applies necessary transformations, and loads the processed data into the Amazon RDS MySQL database.

Task 3: Data Analysis
AWS Lambda works alongside Amazon RDS to compute and store statistical insights. A Lambda function can be scheduled to run SQL queries that calculate key metrics, such as the average minimum and maximum temperatures and total precipitation per station annually. These computed results are then stored in a dedicated table within the RDS database for easy retrieval and analysis.

Task 4: REST API
To make both raw and analyzed weather data accessible, a REST API is developed using Flask. AWS Elastic Beanstalk, a Platform-as-a-Service (PaaS) offering, simplifies the deployment and scaling of the API. By hosting the Flask application on Elastic Beanstalk, API endpoints such as /api/weather and /api/weather/stats become available. These endpoints return JSON-formatted weather data, supporting filtering by station ID and date, with responses served directly from the RDS database.

AWS Services Utilized
Amazon RDS – Managed relational database for structured weather data storage.
Amazon S3 – Secure, scalable storage for raw weather data.
AWS Lambda – Serverless compute service for automating data ingestion and analysis.
AWS Elastic Beanstalk – PaaS for easy API deployment and scaling.
By leveraging these AWS services, this solution ensures a scalable, efficient, and well-integrated weather data processing system, from ingestion to analysis and API accessibility.