# Validating data notebook

#### In this notebook is a demonesrtaion of how to validate data using another data source

This is done using a data `pipeline` that will ingest and clean our data with the press of a button

we will use a dataset from an imaginary country called `Maji Ndogo` created by `Explore-AI` academy
the dataset is containing agricultural data about the fields of `Maji Ndogo`

the dataset contains a wheater data and we will validate these data using a data collected from a weather stations in the area of each field

Our main goal is: Is the data in our `MD_agric_df` dataset representative of reality? To answer this, we use weather-related data from nearby stations to validate our results. If the weather data matches the data we have, we can be more confident that our dataset represents reality. 

So what's the plan? 
1. Create a null hypothesis.
1. Import the `MD_agric_df` dataset and clean it up.
1. Import the weather data.
1. Map the weather data to the field data.
1. Calculate the means of the weather station dataset and the means of the main dataset.
2. Calculate all the parameters we need to do a t-test. 
3. Interpret our results.

# Data dictionary

**1. Geographic features**

- **Field_ID:** A unique identifier for each field (BigInt).
 
- **Elevation:** The elevation of the field above sea level in metres (Float).

- **Latitude:** Geographical latitude of the field in degrees (Float).

- **Longitude:** Geographical longitude of the field in degrees (Float).

- **Location:** Province the field is in (Text).

- **Slope:** The slope of the land in the field (Float).

**2. Weather features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Rainfall:** Amount of rainfall in the area in mm (Float).

- **Min_temperature_C:** Average minimum temperature recorded in Celsius (Float).

- **Max_temperature_C:** Average maximum temperature recorded in Celsius (Float).

- **Ave_temps:** Average temperature in Celcius (Float).

**3. Soil and crop features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Soil_fertility:** A measure of soil fertility where 0 is infertile soil, and 1 is very fertile soil (Float).

- **Soil_type:** Type of soil present in the field (Text).

- **pH:** pH level of the soil, which is a measure of how acidic/basic the soil is (Float).

**4. Farm management features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Pollution_level:** Level of pollution in the area where 0 is unpolluted and 1 is very polluted (Float).

- **Plot_size:** Size of the plot in the field (Ha) (Float).

- **Chosen_crop:** Type of crop chosen for cultivation (Text).

- **Annual_yield:** Annual yield from the field (Float). This is the total output of the field. The field size and type of crop will affect the Annual Yield

- **Standard_yield:** Standardised yield expected from the field, normalised per crop (Float). This is independent of field size, or crop type. Multiplying this number by the field size, and average crop yield will give the Annual_Yield.

<br>

**Weather_station_data (CSV)**

- **Weather_station_ID:** The weather station the data originated from. (Int)

- **Message:** The weather data was captured by sensors at the stations, in the format of text messages.(Str)

**Weather_data_field_mapping (CSV)**

- **Field_ID:** The id of the field that is connected to a weather station. This is the key we can use to join the weather station ID to the original data. (Int)

- **Weather_station_ID:** The weather station that is connected to a field. If a field has `weather_station_ID = 0` then that field is closest to weather station 0. (Int)

<br>

# Dealing with a friendly warning from Pandas

If you are running this notebook in `Python 3.12` or later, you might get a warning if you run the imports below:

In [2]:
import re
import numpy as np
import pandas as pd
import os

If you are lucky enough to see this warning, it let's us know that Pandas is changing soon and will require another package to be installed. 

```python
...2334042735.py:3: DeprecationWarning: 
Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd
```

We can safely ignore these warnings, but soon our script will fail to import Pandas, so let's fix it today, and we won't have to worry about it for a long time. The warning tells us that Pyarrow will soon be a requirement to import Pandas, so we can just install it with pip. 

In [None]:
%pip install Pyarrow

# Creating up our data pipeline

So here's the plan: 

1. Create a module for each step.

2. We will create three modules: 

    a. `data_ingesation.py` - All SQL-related functions, and web-based data retrieval.

    b. `field_data_processor.py` - All transformations, cleanup, and merging functionality.

    c. `weather_data_processor.py` - All transformations and cleanup of the weather station data.

3. Test the modules functionality.

4. Create automated data validation tests to ensure our data is as we expect it to be.

Once we're done with that, we're going to jump into the reason why we're here. So let's get started!

The first challenge; automating the data ingestion. There are two places we're fetching data:
1. SQLite database - We need to create an SQLite engine, connect to the database, run a query and return a pandas DataFrame.
2. Web CSV file - Read the CSV data from the web, and import it as a DataFrame.

So let's start building!

## Data ingestion

Creating modules in Jupyter notebooks is a bit of a pain. If we make changes to the `module.py` file, we have to restart the notebook kernel and import the module again in order to apply those changes. So, we're going to fully develop it, test it in this notebook, and only then, move it to a `data_ingestion.py` file, and import it. 

To create a module, our code should ideally be encapsulated in functions or classes. How do we choose?

Since the process of connecting to a database, and querying some data is relatively straightforward, functions seemed like the best fit. Functions allow us to encapsulate the necessary steps in clear, reusable blocks of code without the overhead of managing class objects. It's like using a **simple tool** for a specific task — pick it up, use it, and put it back without needing to remember anything about the last use.

On the other hand, our **data processing** modules are a bit more complex. These modules not only perform various operations on the data but also need to keep track of the data as it goes through these processes. For this, we use **classes** to **create DataFrame objects** as attributes. This approach simplifies data handling since we're passing the object around, which inherently knows its data and the operations it can perform within the class.

This idea ties back to OOP. Class objects are designed to deal with data and operations on that data via methods, while functions are made to do the simpler tasks.

So for **data ingestion**, we're just going to use functions.

In [3]:
from sqlalchemy import create_engine, text

In [4]:
def create_db_engine(db_path):
    engine = create_engine(db_path)
    return engine

def query_data(engine, sql_query):
    with engine.connect() as connection:
        df = pd.read_sql_query(text(sql_query), connection)
        return df

# Here is an example

In [6]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""


df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield
0,40734,786.05580,-7.389911,-7.556202,Rural_Akatsi,14.795113,1125.2,-3.1,33.1,15.00,0.62,Sandy,6.169393,8.526684e-02,1.3,0.751354,cassava,0.577964
1,30629,674.33410,-7.736849,-1.051539,Rural_Sokoto,11.374611,1450.7,-3.9,30.6,13.35,0.64,Volcanic,5.676648,3.996838e-01,2.2,1.069865,cassava,0.486302
2,39924,826.53390,-9.926616,0.115156,Rural_Sokoto,11.339692,2208.9,-1.8,28.4,13.30,0.69,Volcanic,5.331993,3.580286e-01,3.4,2.208801,tea,0.649647
3,5754,574.94617,-2.420131,-6.592215,Rural_Kilimani,7.109855,328.8,-5.8,32.2,13.20,0.54,Loamy,5.328150,2.866871e-01,2.4,1.277635,cassava,0.532348
4,14146,886.35300,-3.055434,-7.952609,Rural_Kilimani,55.007656,785.2,-2.5,31.0,14.25,0.72,Sandy,5.721234,4.319027e-02,1.5,0.832614,wheat,0.555076
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5649,11472,681.36145,-7.358371,-6.254369,Rural_Akatsi,16.213196,885.7,-4.3,33.4,14.55,0.61,Sandy,5.741063,3.286828e-01,1.1,0.609930,potato,0.554482
5650,19660,667.02120,-3.154559,-4.475046,Rural_Kilimani,2.397553,501.1,-4.8,32.1,13.65,0.54,Sandy,5.445833,1.602583e-01,8.7,3.812289,maize,0.438194
5651,41296,670.77900,-14.472861,-6.110221,Rural_Hawassa,7.636470,1586.6,-3.8,33.4,14.80,0.64,Volcanic,5.385873,8.221326e-09,2.1,1.681629,tea,0.800776
5652,33090,429.48840,-14.653089,-6.984116,Rural_Hawassa,13.944720,1272.2,-6.2,34.6,14.20,0.63,Silt,5.562508,6.917245e-10,1.3,0.659874,cassava,0.507595


This seems simple, but let's think for a second about what could go wrong. For example, what if there is a problem like the database has a schema, and no actual data? Or our query doesn't return any data.

In [7]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield


We get an empty DataFrame, because SQL returned an empty query result. When we try to filter results, we get an answer that would not make sense.

So to avoid this we need to add error handling into our code so that we stop the process if something is wrong, and tell us what the problem is before we continue. 

Secondly, to help us understand how our code is executing we're going to add some logs. While print statements can help us to debug our code, we have to remove them once our code goes into use, one by one. `logging` is a better way to debug our code than print statements because we can add `logging.INFO()` logs to know what our code is doing, and `logging.DEBUG()` statements that have more detail in case we want to debug a specific loop in a bit more in detail. There are also various other tools to use, and we can also silence all logging with a single line of code. If we used print statements, we will have to comment them out one by one. 

If we apply these two ideas, we get the code below:

In [14]:
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')

# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            logger.error("The query returned an empty DataFrame.")
            raise ValueError("The query returned an empty DataFrame.")
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e

Now when we run the incorrect query again:

In [15]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

2024-03-01 18:05:12,994 - data_ingestion - INFO - Database engine created successfully.
2024-03-01 18:05:12,998 - data_ingestion - ERROR - The query returned an empty DataFrame.
2024-03-01 18:05:12,998 - data_ingestion - ERROR - SQL query failed. Error: The query returned an empty DataFrame.


ValueError: The query returned an empty DataFrame.

Next up, let's include the CSV data handling. This is the original code: 

In [16]:
weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"


def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e
    
    
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_data = read_from_web_CSV(weather_mapping_data_URL)

2024-03-01 18:12:25,897 - data_ingestion - INFO - CSV file read successfully from the web.
2024-03-01 18:12:26,725 - data_ingestion - INFO - CSV file read successfully from the web.


Great! Now our code can connect to a database for the field data, use a query to retrieve data and create a DataFrame. We can also import CSV files from a URL into a DataFrame, and avoid pulling unexpected data. If we put all this code together we have the basic structure of our module. 

In [17]:

from sqlalchemy import create_engine, text
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')
# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e
    
def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e


In [18]:
# Testing module functions  
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

2024-03-01 18:13:51,424 - data_ingestion - INFO - Database engine created successfully.
2024-03-01 18:13:51,493 - data_ingestion - INFO - Query executed successfully.
2024-03-01 18:13:52,257 - data_ingestion - INFO - CSV file read successfully from the web.
2024-03-01 18:13:52,945 - data_ingestion - INFO - CSV file read successfully from the web.


## Field data processor

Next up, let's process the field data.

Here our approach needs to be a bit different. If we create a module with a bunch of functions, we are going to have a hard time moving the DataFrame around the whole time. Instead, we're going to build a Class that encapsulates the whole data processing process for the field-related data called `FieldDataProcessor`. In the class, we will create a DataFrame attribute and methods that alter that attribute. So we encapsulate all of the logic in this `FieldDataProcessor` class, we abstract all of the details and only need to call something like `FieldDataProcessor.process_data()`. 

We could even include Inheritance and Polymorphism if we create a `DataProcessor` super class and create subclasses for `FieldDataProcessor` and `WeatherDataProcessor`.  But, there is a good reason not to. The data handling of the field data is quite different from the handling of the weather data. The field data comes from an SQL database, and we transform the data in a particular way, while the weather data is sourced from a CSV, and processed differently.

So these two processes don't share processing steps, so it makes more sense to make a class for each.

So let's create the class framework:

In [20]:
import pandas as pd
from data_procrssing.data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:
    
    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see
       
        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """
            SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

        self.initialize_logging(logging_level)
        
        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class. 
    def initialize_logging(self, logging_level):

        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # DataFrame methods 
    def ingest_sql_data(self):
        # First we want to get the data from the SQL database
        pass
    
    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods and applies the changes, step by step. This is the method we will call, and it will call the other methods in order
        
        weather_map_df = self.weather_station_mapping() 
        self.df = self.ingest_sql_data()
        self.df = self.rename_columns()
        self.df = self.apply_corrections()
        self.df = self.df.merge(weather_map_df, on='Field_ID', how='left')
        self.df = self.df.drop(columns="Unnamed: 0")

now we need to develop the class methods

In [21]:
import pandas as pd
from data_procrssing.data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging


class FieldDataProcessor:
   
    def __init__(self, config_params, logging_level="INFO"):  # Make sure to add this line, passing in config_params to the class 
        self.db_path = config_params['db_path']
        self.sql_query = config_params['sql_query']
        self.columns_to_rename = config_params['columns_to_rename']
        self.values_to_rename = config_params['values_to_rename']
        self.weather_map_data = config_params['weather_mapping_csv']
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
    

    # This method enables logging in the class.
    def initialize_logging(self, logging_level):        
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.
            

    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df


    def rename_columns(self):
        # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]       
        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"
        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})
        self.logger.info(f"Swapped columns: {column1} with {column2}")
        return self.df
        
            
    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))
        return self.df

    
    def weather_station_mapping(self):
        return read_from_web_CSV(self.weather_map_data)
    
    
    def process(self):
        self.df = self.ingest_sql_data()
        self.df = self.rename_columns()
        self.df = self.apply_corrections()
        weather_map_df = self.weather_station_mapping() 
        self.df = self.df.merge(weather_map_df, on='Field_ID', how='left')
        self.df = self.df.drop(columns="Unnamed: 0")

Here is how it works.

In [25]:
config_params = {
    "sql_query": """
                SELECT *
                FROM geographic_features
                LEFT JOIN weather_features USING (Field_ID)
                LEFT JOIN soil_and_crop_features USING (Field_ID)
                LEFT JOIN farm_management_features USING (Field_ID)
            """,
    "db_path": 'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename": {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename": {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
}

field_processor = FieldDataProcessor(config_params)
field_processor.process()

field_df = field_processor.df
field_df['Weather_station'].unique()

2024-03-01 21:27:45,032 - data_ingestion - INFO - Database engine created successfully.
2024-03-01 21:27:45,104 - data_ingestion - INFO - Query executed successfully.
2024-03-01 21:27:45,104 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-03-01 21:27:45,109 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-03-01 21:27:45,804 - data_ingestion - INFO - CSV file read successfully from the web.


array([4, 0, 1, 2, 3], dtype=int64)

<br> 

### Creating `field_data_processor.py`

Now we have a robust data processing class for the field-related data. Our final step is to create the module file and document the code.

<br>

## Weather data processor

Now for the last module. The `WeatherDataProcessor` class will be dealing with all of the weather-related data. Again we want to instantiate the class, then call a `.process()` method to import and clean the data. Here is the code we used last time:

In [26]:
import re
import numpy as np
import pandas as pd
import logging
from data_procrssing.data_ingestion import read_from_web_CSV


class WeatherDataProcessor:
    """
    A class for processing weather station data.

    Parameters:
    - config_params (dict): Configuration parameters for data processing.
    - logging_level (str, optional): Logging level for the class (default is "INFO").

    Attributes:
    - weather_station_data (str): CSV file path for weather station data.
    - patterns (dict): Regular expression patterns for extracting measurements from messages.
    - weather_df (pd.DataFrame): DataFrame to store weather station data.
    - logger (logging.Logger): Logger object for logging messages.

    Methods:
    - initialize_logging(logging_level): Set up logging for the instance.
    - weather_station_mapping(): Load weather station data from the web.
    - extract_measurement(message): Extract measurements from a given message using regex patterns.
    - process_messages(): Process messages in the DataFrame to extract measurements.
    - calculate_means(): Calculate mean values for each weather station and measurement.
    - process(): Execute all methods in the correct order for data processing.
    """
    def __init__(self, config_params, logging_level="INFO"): # Now we're passing in the confi_params dictionary already
        """
        Initialize the WeatherDataProcessor instance.

        Parameters:
        - config_params (dict): Configuration parameters for data processing.
        - logging_level (str, optional): Logging level for the class (default is "INFO").
        """
        self.weather_station_data = config_params['weather_csv_path']
        self.patterns = config_params['regex_patterns']
        self.weather_df = None  # Initialize weather_df as None or as an empty DataFrame
        self.initialize_logging(logging_level)

    def initialize_logging(self, logging_level):
        """
        Set up logging for this instance of WeatherDataProcessor.

        Parameters:
        - logging_level (str): Logging level for the class.
        """
        logger_name = __name__ + ".WeatherDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

    def weather_station_mapping(self):
        """
        Load weather station data from the web.

        Returns:
        - pd.DataFrame: Weather station data DataFrame.
        """
        self.weather_df = read_from_web_CSV(self.weather_station_data)
        self.logger.info("Successfully loaded weather station data from the web.") 
        # Here, you can apply any initial transformations to self.weather_df if necessary.

    
    def extract_measurement(self, message):
        """
        Extract measurements from a given message using regex patterns.

        Parameters:
        - message (str): The message containing weather measurements.

        Returns:
        - tuple or None: A tuple containing the measurement key and value, or None if no match is found.
        """
        for key, pattern in self.patterns.items():
            match = re.search(pattern, message)
            if match:
                self.logger.debug(f"Measurement extracted: {key}")
                return key, float(next((x for x in match.groups() if x is not None)))
        self.logger.debug("No measurement match found.")
        return None, None

    def process_messages(self):
        """
        Process messages in the DataFrame to extract measurements.

        Returns:
        - pd.DataFrame: Processed DataFrame with extracted measurements.
        """
        if self.weather_df is not None:
            result = self.weather_df['Message'].apply(self.extract_measurement)
            self.weather_df['Measurement'], self.weather_df['Value'] = zip(*result)
            self.logger.info("Messages processed and measurements extracted.")
        else:
            self.logger.warning("weather_df is not initialized, skipping message processing.")
        return self.weather_df

    def calculate_means(self):
        """
        Calculate mean values for each weather station and measurement.

        Returns:
        - pd.DataFrame or None: DataFrame with mean values or None if weather_df is not initialized.
        """
        if self.weather_df is not None:
            means = self.weather_df.groupby(by=['Weather_station_ID', 'Measurement'])['Value'].mean()
            self.logger.info("Mean values calculated.")
            return means.unstack()
        else:
            self.logger.warning("weather_df is not initialized, cannot calculate means.")
            return None
    
    def process(self):
        """
        Execute all methods in the correct order for data processing.

        Returns:
        - None
        """
        self.weather_station_mapping()  # Load and assign data to weather_df
        self.process_messages()  # Process messages to extract measurements
        self.logger.info("Data processing completed.")



here is an example.

In [27]:
import re
import numpy as np
import pandas as pd
# from field_data_processor import FieldDataProcessor
from data_procrssing.weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params = {
    # Paste in your previous dictionary data in here
     "sql_query": """
                SELECT *
                FROM geographic_features
                LEFT JOIN weather_features USING (Field_ID)
                LEFT JOIN soil_and_crop_features USING (Field_ID)
                LEFT JOIN farm_management_features USING (Field_ID)
            """, # Insert your SQL query
    "db_path": 'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename": {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename": {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here

    # Add two new keys
    "regex_patterns" :  {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    } # Insert the regex pattern we used to process the messages
}
# Ignoring the field data for now.
# field_processor = FieldDataProcessor(config_params)
# field_processor.process()
# field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

weather_df['Measurement'].unique()

2024-03-01 22:01:51,394 - data_ingestion - INFO - CSV file read successfully from the web.
2024-03-01 22:01:51,394 - data_procrssing.weather_data_processor.WeatherDataProcessor - INFO - Successfully loaded weather station data from the web.
2024-03-01 22:01:51,454 - data_procrssing.weather_data_processor.WeatherDataProcessor - INFO - Messages processed and measurements extracted.
2024-03-01 22:01:51,455 - data_procrssing.weather_data_processor.WeatherDataProcessor - INFO - Data processing completed.


array(['Temperature', 'Pollution_level', 'Rainfall'], dtype=object)

### Validating our data pipeline

So we finally have working modules that now automatically pull data from the database  (or the web), process it, clean it, and return our starting DataFrame. Before we jump in and analyse the data, let's pause for a second and ask: Did the changes actually get applied? Did we correct the elevation data, did we rename the columns? We could go back to the old ways, and create queries to check, but a better way is to **test our dataset**. 

Let's get the data in first. Remember to use your `config_params` dictionary.

In [32]:
import re
import numpy as np
import pandas as pd
from data_procrssing.field_data_processor import FieldDataProcessor
from data_procrssing.weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params = {
     "sql_query": """
                SELECT *
                FROM geographic_features
                LEFT JOIN weather_features USING (Field_ID)
                LEFT JOIN soil_and_crop_features USING (Field_ID)
                LEFT JOIN farm_management_features USING (Field_ID)
            """,
    "db_path": 'sqlite:///Maji_Ndogo_farm_survey_small.db',
    "columns_to_rename": {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'},
    "values_to_rename": {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'},
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv",
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv",
    "regex_patterns" :  {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }
}

field_processor = FieldDataProcessor(config_params)
field_processor.process()
field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

# Rename 'Ave_temps' in field_df to 'Temperature' to match weather_df
field_df.rename(columns={'Ave_temps': 'Temperature'}, inplace=True)

2024-03-01 22:16:22,214 - data_ingestion - INFO - Database engine created successfully.
2024-03-01 22:16:22,284 - data_ingestion - INFO - Query executed successfully.
2024-03-01 22:16:22,284 - data_procrssing.field_data_processor.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-03-01 22:16:22,287 - data_procrssing.field_data_processor.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-03-01 22:16:23,037 - data_ingestion - INFO - CSV file read successfully from the web.
2024-03-01 22:16:24,208 - data_ingestion - INFO - CSV file read successfully from the web.
2024-03-01 22:16:24,209 - data_procrssing.weather_data_processor.WeatherDataProcessor - INFO - Successfully loaded weather station data from the web.
2024-03-01 22:16:24,228 - data_procrssing.weather_data_processor.WeatherDataProcessor - INFO - Messages processed and measurements extracted.
2024-03-01 22:16:24,229 - data_procrssing.weather_data_processor.WeatherDataProcessor - INFO - Data process