<div align="center" style=" font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/Python-Notebook-Banners/Code_challenge.png"  style="display: block; margin-left: auto; margin-right: auto;";/>
</div>

# Integrated project: Validating our data
© ExploreAI Academy

In this Code Challenge we’re diving into the agricultural dataset again to continue to validate our data. Before we do that, we’re pausing to build a data pipeline that will ingest and clean our data with the press of a button, cleaning up our code significantly. Once that’s ready, we’ll complete our data validation.

⚠️ **NOTE that this code challenge is graded and will contribute to your overall marks for this module. Submit this notebook for grading. Note that the names of the functions are different in this notebook. Transfer the code in your notebook to this submission notebook**

### Instructions

- **Do not add or remove cells in this notebook. Do not edit or remove the `### START FUNCTION` or `### END FUNCTION` comments. Do not add any code outside of the functions you are required to edit. Doing any of this will lead to a mark of 0%!**

- Answer the questions according to the specifications provided.

- Use the given cell in each question to see if your function matches the expected outputs.

- Do not hard-code answers to the questions.

- The use of StackOverflow, Google, and other online tools is permitted. However, copying a fellow student's code is not permissible and is considered a breach of the Honour code. Doing this will result in a mark of 0%.

# Introduction

Let's pick up where we left off, shall we?

Recall our previous disappointment - the mismatch between our dataset and the weather station data? That was a curveball, wasn't it? Half of our measurements were out of range, raising eyebrows and doubts alike. Our quest for data validation had hit a snag, but as any seasoned data scientist will tell you, every problem is a hidden opportunity waiting to be discovered.

I'm sure you have some ideas on how to solve this, so I'll share mine...

One thing that bugs me is the tolerance level I chose. Why 1.5%? 

We saw that half of our means were not within that tolerance, but why would it be within 1.5% and not 5%?

Although we may think 1.5% is a good measurement of accuracy, there may be errors in each dataset, and more variation in one than the other, which could make the means differ by more than 1.5% even if the objective truth is that the means are the same. We need to be a little more scientific with our approach and account for the difference in the data. I am sure you know what I am about to say next, right?

**Hypothesis testing** takes into account both the means and the variances of the distributions being compared. The variance here is crucial because it gives us insight into the spread of the data points around the means for our two datasets. Two samples could have the same mean but very different variances, leading to different interpretations of their similarities or differences.

Our main goal is the same: Is the data in our `MD_agric_df` dataset representative of reality? To answer this, we use weather-related data from nearby stations to validate our results. If the weather data matches the data we have, we can be more confident that our dataset represents reality. 

So what's the plan? 
1. Create a null hypothesis.
1. Import the `MD_agric_df` dataset and clean it up.
1. Import the weather data.
1. Map the weather data to the field data.
1. Calculate the means of the weather station dataset and the means of the main dataset.
2. Calculate all the parameters we need to do a t-test. 
3. Interpret our results.

Do you see a bit of repetition here? Steps 2-5 are a repeat of last time. The quick and dirty fix is to copy that code across to our notebook, but what about next time and the time after that? All of that code will also clutter this notebook and make our analysis harder to read.

You might also think of exporting the fixed and merged data from last time to a CSV, but what if there is new data in the database?

So, let's stop for a second and think about how we can make this simpler, more extendible and reusable in the future. 

# Data dictionary

**1. Geographic features**

- **Field_ID:** A unique identifier for each field (BigInt).
 
- **Elevation:** The elevation of the field above sea level in metres (Float).

- **Latitude:** Geographical latitude of the field in degrees (Float).

- **Longitude:** Geographical longitude of the field in degrees (Float).

- **Location:** Province the field is in (Text).

- **Slope:** The slope of the land in the field (Float).

**2. Weather features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Rainfall:** Amount of rainfall in the area in mm (Float).

- **Min_temperature_C:** Average minimum temperature recorded in Celsius (Float).

- **Max_temperature_C:** Average maximum temperature recorded in Celsius (Float).

- **Ave_temps:** Average temperature in Celcius (Float).

**3. Soil and crop features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Soil_fertility:** A measure of soil fertility where 0 is infertile soil, and 1 is very fertile soil (Float).

- **Soil_type:** Type of soil present in the field (Text).

- **pH:** pH level of the soil, which is a measure of how acidic/basic the soil is (Float).

**4. Farm management features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Pollution_level:** Level of pollution in the area where 0 is unpolluted and 1 is very polluted (Float).

- **Plot_size:** Size of the plot in the field (Ha) (Float).

- **Chosen_crop:** Type of crop chosen for cultivation (Text).

- **Annual_yield:** Annual yield from the field (Float). This is the total output of the field. The field size and type of crop will affect the Annual Yield

- **Standard_yield:** Standardised yield expected from the field, normalised per crop (Float). This is independent of field size, or crop type. Multiplying this number by the field size, and average crop yield will give the Annual_Yield.

<br>

**Weather_station_data (CSV)**

- **Weather_station_ID:** The weather station the data originated from. (Int)

- **Message:** The weather data was captured by sensors at the stations, in the format of text messages.(Str)

**Weather_data_field_mapping (CSV)**

- **Field_ID:** The id of the field that is connected to a weather station. This is the key we can use to join the weather station ID to the original data. (Int)

- **Weather_station_ID:** The weather station that is connected to a field. If a field has `weather_station_ID = 0` then that field is closest to weather station 0. (Int)

<br>

# Dealing with a friendly warning from Pandas

If you are running this notebook in `Python 3.12` or later, you might get a warning if you run the imports below:

In [1]:
import re
import numpy as np
import pandas as pd
import os

If you are lucky enough to see this warning, it let's us know that Pandas is changing soon and will require another package to be installed. 

```python
...2334042735.py:3: DeprecationWarning: 
Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd
```

We can safely ignore these warnings, but soon our script will fail to import Pandas, so let's fix it today, and we won't have to worry about it for a long time. The warning tells us that Pyarrow will soon be a requirement to import Pandas, so we can just install it with pip. 

In [3]:
%pip install Pyarrow

Collecting Pyarrow
  Downloading pyarrow-15.0.0-cp312-cp312-win_amd64.whl.metadata (3.1 kB)
Downloading pyarrow-15.0.0-cp312-cp312-win_amd64.whl (25.3 MB)
   ---------------------------------------- 0.0/25.3 MB ? eta -:--:--
   ---------------------------------------- 0.0/25.3 MB ? eta -:--:--
   ---------------------------------------- 0.0/25.3 MB ? eta -:--:--
   ---------------------------------------- 0.0/25.3 MB ? eta -:--:--
   ---------------------------------------- 0.0/25.3 MB ? eta -:--:--
   ---------------------------------------- 0.0/25.3 MB ? eta -:--:--
   ---------------------------------------- 0.0/25.3 MB 660.6 kB/s eta 0:00:39
   ---------------------------------------- 0.0/25.3 MB 660.6 kB/s eta 0:00:39
   ---------------------------------------- 0.0/25.3 MB 219.4 kB/s eta 0:01:55
   ---------------------------------------- 0.0/25.3 MB 219.4 kB/s eta 0:01:55
   ---------------------------------------- 0.0/25.3 MB 219.4 kB/s eta 0:01:55
   ---------------------------




   ---------------------------- ----------- 17.9/25.3 MB 59.7 kB/s eta 0:02:04
   ---------------------------- ----------- 17.9/25.3 MB 59.7 kB/s eta 0:02:04
   ---------------------------- ----------- 17.9/25.3 MB 59.7 kB/s eta 0:02:04
   ---------------------------- ----------- 17.9/25.3 MB 59.7 kB/s eta 0:02:04
   ---------------------------- ----------- 17.9/25.3 MB 59.5 kB/s eta 0:02:04
   ---------------------------- ----------- 17.9/25.3 MB 59.5 kB/s eta 0:02:04
   ---------------------------- ----------- 17.9/25.3 MB 59.5 kB/s eta 0:02:04
   ---------------------------- ----------- 17.9/25.3 MB 59.5 kB/s eta 0:02:04
   ---------------------------- ----------- 18.0/25.3 MB 60.9 kB/s eta 0:02:01
   ---------------------------- ----------- 18.0/25.3 MB 60.9 kB/s eta 0:02:01
   ---------------------------- ----------- 18.0/25.3 MB 61.1 kB/s eta 0:02:00
   ---------------------------- ----------- 18.0/25.3 MB 61.1 kB/s eta 0:02:00
   ---------------------------- ----------- 18.0/25

# Cleaning up our data pipeline

When we pulled in the data last time, we actually made an assumption that our script worked. We assumed that all the fixes we made to the data actually made it in, and assumed that the database didn't change. But what if someone added more records, fixed the data on the database that we were making in our notebook, or added a new column of data? 

Last time we started to build what's known as a **data pipeline**. We often do this as data scientists. The data we work with is almost always stored in databases, and we don't want to transform, clean up or make changes to the database unless it is really beneficial. So we retrieve the data from the database, and in our workspace, we shape the data into a useful format for our goal. 

Last time we imported data with this code: (⚠️ Don't run it)

In [None]:
sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

In [None]:
MD_agric_df.rename(columns={'Annual_yield': 'Crop_type_Temp', 'Crop_type': 'Annual_yield'}, inplace=True)
MD_agric_df.rename(columns={'Crop_type_Temp': 'Crop_type'}, inplace=True)
MD_agric_df['Elevation'] = MD_agric_df['Elevation'].abs()

# Correcting 'Crop_type' column
def correct_crop_type(crop):
    crop = crop.strip()  # Remove trailing spaces
    corrections = {
        'cassaval': 'cassava',
        'wheatn': 'wheat',
        'teaa': 'tea'
    }
    return corrections.get(crop, crop)  # Get the corrected crop type, or return the original if not in corrections

# Apply the correction function to the Crop_type column
MD_agric_df['Crop_type'] = MD_agric_df['Crop_type'].apply(correct_crop_type)

In [None]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

In [None]:
import re # Importing the regex pattern
import numpy as np


patterns = {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }

def extract_measurement(message):
    """
    Extracts a numeric measurement value from a given message string.

    The function applies regular expressions to identify and extract
    numeric values related to different types of measurements such as
    Rainfall, Average Temperatures, and Pollution Levels from a text message.
    It returns the key of the matching record, and first matching value as a floating-point number.
    
    Parameters:
    message (str): A string message containing the measurement information.

    Returns:
    float: The extracted numeric value of the measurement if a match is found;
           otherwise, None.

    The function uses the following patterns for extraction:
    - Rainfall: Matches numbers (including decimal) followed by 'mm', optionally spaced.
    - Ave_temps: Matches numbers (including decimal) followed by 'C', optionally spaced.
    - Pollution_level: Matches numbers (including decimal) following 'Pollution at' or '='.
    
    Example usage:
    extract_measurement("【2022-01-04 21:47:48】温度感应: 现在温度是 12.82C.")
    # Returns: 'Temperature', 12.82
    """
    
    for key, pattern in patterns.items(): # Loop through all of the patterns and check if it matches the pattern value.
        match = re.search(pattern, message)
        if match:
            # Extract the first group that matches, which should be the measurement value if all previous matches are empty.
            # print(match.groups()) # Uncomment this line to help you debug your regex patterns.
            return key, float(next((x for x in match.groups() if x is not None)))
    
    return None, None

# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

In [None]:
# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

weather_station_means = weather_station_df.groupby(by = ['Weather_station_ID','Measurement'])['Value'].mean(numeric_only = True)
weather_station_means = weather_station_means.unstack()
weather_station_means

In [None]:
# Use this line of code to see which messages are not assigned yet.
weather_station_df[(weather_station_df['Measurement'] == None)|(weather_station_df['Value'].isna())]

In [None]:

MD_agric_df = MD_agric_df.merge(weather_station_mapping_df,on = 'Field_ID', how='left')
MD_agric_df.drop(columns="Unnamed: 0")
MD_agric_df_weather_means = MD_agric_df.groupby("Weather_station").mean(numeric_only = True)[['Pollution_level','Rainfall', 'Ave_temps']]

MD_agric_df_weather_means = MD_agric_df_weather_means.rename(columns = {'Ave_temps':"Temperature"})
MD_agric_df_weather_means

We have to copy that code across from another notebook, and we may miss some blocks or lines of code. We may not copy over the code in the right order, and we may not have documented the code well either. 

Do you also think this is a massive block of code that feels like it is in the way of our analysis? Is it simple to follow and simple to change? 

No! So let's spend a bit of time building a proper data pipeline that imports the data from the different sources we have, cleans up the data, and tests whether our data is what we expect it to be. We want to do all of this in a way that doesn't cause our notebook to be filled with thousands of lines of code and become a nightmare to debug. As a final step, we want to automate a few simple data validation checks in our code.

To do this we're going to re-organise or **refactor** our code into modules. We're going to reorganise all of the code into smaller modules so our code is more readable, maintainable and extendable. 

We can create a module to interact with the database, a module to transform and clean the field-related data and another module to process the weather data.

<br>

So here's the plan: 

1. Gather all of the code from our last "pipeline".

2. Re-organise the code into our new three modules: 

    a. `data_ingesation.py` - All SQL-related functions, and web-based data retrieval.

    b. `field_data_processor.py` - All transformations, cleanup, and merging functionality.

    c. `weather_data_processor.py` - All transformations and cleanup of the weather station data.

3. Copy our code into the modules and test their functionality.

4. Create automated data validation tests to ensure our data is as we expect it to be.

Once we're done with that, we're going to jump into the reason why we're here. So let's get started!

# Modules

Why are we doing this? Well, we want to reduce our data pipeline code to a couple of lines like this: 

In [None]:
field_processor.process()
field_df = field_processor.df

weather_processor.process()
field_df = field_processor.weather_df

We're going to need a bit more code, but we want to press a button and go from databases and CSVs to a Pandas DataFrame. All of the code and processes are happening in the modules and we get the end result. The second big motivator is to make our code more modular. If we want to debug a problem in the field data, we know where to go, and if we want to import other IoT weather sensors we can just modify the weather data module.

The first challenge; automating the data ingestion. There are two places we're fetching data:
1. SQLite database - We need to create an SQLite engine, connect to the database, run a query and return a pandas DataFrame.
2. Web CSV file - Read the CSV data from the web, and import it as a DataFrame.

So let's start building!

## Data ingestion

Ok, so first up, let's grab the code that interacted with the database, and the web CSV files.

SQL:

In [2]:
import pandas as pd # importing the Pandas package with an alias, pd
from sqlalchemy import create_engine, text # Importing the SQL interface. If this fails, run !pip install sqlalchemy in another cell.
import matplotlib.pyplot as plt
import seaborn as sns


# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.


sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

CSV files: 

In [3]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

Creating modules in Jupyter notebooks is a bit of a pain. If we make changes to the `module.py` file, we have to restart the notebook kernel and import the module again in order to apply those changes. So, we're going to fully develop it, test it in this notebook, and only then, move it to a `data_ingestion.py` file, and import it. 

To create a module, our code should ideally be encapsulated in functions or classes. How do we choose?

Since the process of connecting to a database, and querying some data is relatively straightforward, functions seemed like the best fit. Functions allow us to encapsulate the necessary steps in clear, reusable blocks of code without the overhead of managing class objects. It's like using a **simple tool** for a specific task — pick it up, use it, and put it back without needing to remember anything about the last use.

On the other hand, our **data processing** modules are a bit more complex. These modules not only perform various operations on the data but also need to keep track of the data as it goes through these processes. For this, we use **classes** to **create DataFrame objects** as attributes. This approach simplifies data handling since we're passing the object around, which inherently knows its data and the operations it can perform within the class.

This idea ties back to OOP. Class objects are designed to deal with data and operations on that data via methods, while functions are made to do the simpler tasks.

So for **data ingestion**, we're just going to use functions.

So our **main task** will be to **convert the data ingestion code into functions** that we can call from the module.

So this code: 

In [4]:
# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.


sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

Becomes neat, modular functions: 

In [5]:
def create_db_engine(db_path):
    engine = create_engine(db_path)
    return engine

def query_data(engine, sql_query):
    with engine.connect() as connection:
        df = pd.read_sql_query(text(sql_query), connection)
        return df

So if we call:

In [6]:
create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

Engine(sqlite:///Maji_Ndogo_farm_survey_small.db)

We get the SQL engine object which we can use with the query to connect to the database, and run a query.

In [7]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""


df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield
0,40734,786.05580,-7.389911,-7.556202,Rural_Akatsi,14.795113,1125.2,-3.1,33.1,15.00,0.62,Sandy,6.169393,8.526684e-02,1.3,0.751354,cassava,0.577964
1,30629,674.33410,-7.736849,-1.051539,Rural_Sokoto,11.374611,1450.7,-3.9,30.6,13.35,0.64,Volcanic,5.676648,3.996838e-01,2.2,1.069865,cassava,0.486302
2,39924,826.53390,-9.926616,0.115156,Rural_Sokoto,11.339692,2208.9,-1.8,28.4,13.30,0.69,Volcanic,5.331993,3.580286e-01,3.4,2.208801,tea,0.649647
3,5754,574.94617,-2.420131,-6.592215,Rural_Kilimani,7.109855,328.8,-5.8,32.2,13.20,0.54,Loamy,5.328150,2.866871e-01,2.4,1.277635,cassava,0.532348
4,14146,886.35300,-3.055434,-7.952609,Rural_Kilimani,55.007656,785.2,-2.5,31.0,14.25,0.72,Sandy,5.721234,4.319027e-02,1.5,0.832614,wheat,0.555076
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5649,11472,681.36145,-7.358371,-6.254369,Rural_Akatsi,16.213196,885.7,-4.3,33.4,14.55,0.61,Sandy,5.741063,3.286828e-01,1.1,0.609930,potato,0.554482
5650,19660,667.02120,-3.154559,-4.475046,Rural_Kilimani,2.397553,501.1,-4.8,32.1,13.65,0.54,Sandy,5.445833,1.602583e-01,8.7,3.812289,maize,0.438194
5651,41296,670.77900,-14.472861,-6.110221,Rural_Hawassa,7.636470,1586.6,-3.8,33.4,14.80,0.64,Volcanic,5.385873,8.221326e-09,2.1,1.681629,tea,0.800776
5652,33090,429.48840,-14.653089,-6.984116,Rural_Hawassa,13.944720,1272.2,-6.2,34.6,14.20,0.63,Silt,5.562508,6.917245e-10,1.3,0.659874,cassava,0.507595


We can even call the `create_db_engine()` function inside the `query_data()` function:

In [8]:
sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""


df = query_data(create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db'), sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield
0,40734,786.05580,-7.389911,-7.556202,Rural_Akatsi,14.795113,1125.2,-3.1,33.1,15.00,0.62,Sandy,6.169393,8.526684e-02,1.3,0.751354,cassava,0.577964
1,30629,674.33410,-7.736849,-1.051539,Rural_Sokoto,11.374611,1450.7,-3.9,30.6,13.35,0.64,Volcanic,5.676648,3.996838e-01,2.2,1.069865,cassava,0.486302
2,39924,826.53390,-9.926616,0.115156,Rural_Sokoto,11.339692,2208.9,-1.8,28.4,13.30,0.69,Volcanic,5.331993,3.580286e-01,3.4,2.208801,tea,0.649647
3,5754,574.94617,-2.420131,-6.592215,Rural_Kilimani,7.109855,328.8,-5.8,32.2,13.20,0.54,Loamy,5.328150,2.866871e-01,2.4,1.277635,cassava,0.532348
4,14146,886.35300,-3.055434,-7.952609,Rural_Kilimani,55.007656,785.2,-2.5,31.0,14.25,0.72,Sandy,5.721234,4.319027e-02,1.5,0.832614,wheat,0.555076
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5649,11472,681.36145,-7.358371,-6.254369,Rural_Akatsi,16.213196,885.7,-4.3,33.4,14.55,0.61,Sandy,5.741063,3.286828e-01,1.1,0.609930,potato,0.554482
5650,19660,667.02120,-3.154559,-4.475046,Rural_Kilimani,2.397553,501.1,-4.8,32.1,13.65,0.54,Sandy,5.445833,1.602583e-01,8.7,3.812289,maize,0.438194
5651,41296,670.77900,-14.472861,-6.110221,Rural_Hawassa,7.636470,1586.6,-3.8,33.4,14.80,0.64,Volcanic,5.385873,8.221326e-09,2.1,1.681629,tea,0.800776
5652,33090,429.48840,-14.653089,-6.984116,Rural_Hawassa,13.944720,1272.2,-6.2,34.6,14.20,0.63,Silt,5.562508,6.917245e-10,1.3,0.659874,cassava,0.507595


This seems simple, but let's think for a second about what could go wrong. For example, what if there is a problem like the database has a schema, and no actual data? Or our query doesn't return any data.

In [9]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield


We get an empty DataFrame, because SQL returned an empty query result. When we try to filter results, we get an answer that would not make sense.

In [11]:
df['Rainfall'] > 100

Series([], Name: Rainfall, dtype: bool)

So to avoid this we need to add error handling into our code so that we stop the process if something is wrong, and tell us what the problem is before we continue. 

Secondly, to help us understand how our code is executing we're going to add some logs. While print statements can help us to debug our code, we have to remove them once our code goes into use, one by one. `logging` is a better way to debug our code than print statements because we can add `logging.INFO()` logs to know what our code is doing, and `logging.DEBUG()` statements that have more detail in case we want to debug a specific loop in a bit more in detail. There are also various other tools to use, and we can also silence all logging with a single line of code. If we used print statements, we will have to comment them out one by one. 

If we apply these two ideas, we get the code below:

In [10]:
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')

# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e

Now when we run the incorrect query again:

In [11]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

2024-02-24 17:16:17,509 - data_ingestion - INFO - Database engine created successfully.
2024-02-24 17:16:17,524 - data_ingestion - ERROR - The query returned an empty DataFrame.
2024-02-24 17:16:17,527 - data_ingestion - ERROR - SQL query failed. Error: The query returned an empty DataFrame.


ValueError: The query returned an empty DataFrame.

We get a log of what happened, and we now get an error telling us there is something wrong with the DataFrame, and we are prevented from processing it further.

Next up, let's include the CSV data handling. This is the original code: 

In [12]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

These two files are imported in the same way, so we can use one function to do it.

In [13]:
weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"


def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e
    
    
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_data = read_from_web_CSV(weather_mapping_data_URL)

2024-02-24 17:16:41,128 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-24 17:16:41,639 - data_ingestion - INFO - CSV file read successfully from the web.


Great! Now our code can connect to a database for the field data, use a query to retrieve data and create a DataFrame. We can also import CSV files from a URL into a DataFrame, and avoid pulling unexpected data. If we put all this code together we have the basic structure of our module. 

In [14]:

from sqlalchemy import create_engine, text
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')
# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e
    
def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e


In [17]:
# Testing module functions  
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

2024-02-24 17:17:15,936 - data_ingestion - INFO - Database engine created successfully.
2024-02-24 17:17:16,177 - data_ingestion - INFO - Query executed successfully.
2024-02-24 17:17:16,726 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-24 17:17:17,089 - data_ingestion - INFO - CSV file read successfully from the web.


Once we run this we should get a log telling us that it all worked. 

Note that there are imports at the top of the cell. When we move this to a module, it needs to import packages like SQL Alchemy and Pandas. 

Let's do a test to make sure these functions are working well:

In [18]:
field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)


**Expected outcome:** 

`field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)`

Before we go and move this into the `data_ingestion.py` file, we are missing one **crucial** aspect. 

**Documentation!** We need to add a module docstring and function docstrings. 

<br>

⚙️ **Task:** Create a **module docstring** and **function docstrings** for each function. Refer to your notes and PEP 8 guidelines (also see `PEP 257`) to guide you on what these docstrings should contain. Edit the code below to make these changes. 

>⚠️ Do not change the logic of this code, only add documentation.

In [56]:
from sqlalchemy import create_engine, text
import logging
import pandas as pd
# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')
# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

In [57]:
### START FUNCTION

def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e
    
def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e
    
### END FUNCTION

Let's test our code to make sure it works before we move on.

In [58]:
# Testing module functions  
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

2024-02-24 18:14:36,019 - data_ingestion - INFO - Database engine created successfully.
2024-02-24 18:14:37,267 - data_ingestion - INFO - Query executed successfully.


2024-02-24 18:15:04,648 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-24 18:15:08,118 - data_ingestion - INFO - CSV file read successfully from the web.


field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)


<br> 

⚙️ **Task:** Once the `data_ingestion` code runs smoothly, create a new file, and name it `data_ingestion.py` and import the functions into the notebook. You will have to copy over the import statements and variables too.

In [2]:
# Importing our new module
from data_ingestion import create_db_engine, query_data, read_from_web_CSV

#Checking if the function names are now associated with the module
print(create_db_engine.__module__)
print(query_data.__module__)
print(read_from_web_CSV.__module__)


data_ingestion
data_ingestion
data_ingestion



Now the names `create_db_engine`, `query_data`, `read_from_web_CSV` are linked to the `data_ingestion` module, so our module is imported correctly. Lastly, we run the test commands again to make sure ite sure it works as expected

In [3]:
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

NameError: name 'db_path' is not defined

And there we go, we have a working data ingestion module! One down, two to go.

## Field data processor

Next up, let's process the field data. This is the code we used last time to process the data, so let's start there:

In [3]:
MD_agric_df = field_df.copy()

MD_agric_df.rename(columns={'Annual_yield': 'Crop_type_Temp', 'Crop_type': 'Annual_yield'}, inplace=True)
MD_agric_df.rename(columns={'Crop_type_Temp': 'Crop_type'}, inplace=True)
MD_agric_df['Elevation'] = MD_agric_df['Elevation'].abs()

# Correcting 'Crop_type' column
def correct_crop_type(crop):
    corrections = {
        'cassaval': 'cassava',
        'wheatn': 'wheat',
        'teaa': 'tea'
    }
    return corrections.get(crop, crop)  # Get the corrected crop type, or return the original if not in corrections

# Apply the correction function to the Crop_type column
MD_agric_df['Crop_type'] = MD_agric_df['Crop_type'].apply(correct_crop_type)

NameError: name 'field_df' is not defined

Here our approach needs to be a bit different. If we create a module with a bunch of functions, we are going to have a hard time moving the DataFrame around the whole time. Instead, we're going to build a Class that encapsulates the whole data processing process for the field-related data called `FieldDataProcessor`. In the class, we will create a DataFrame attribute and methods that alter that attribute. So we encapsulate all of the logic in this `FieldDataProcessor` class, we abstract all of the details and only need to call something like `FieldDataProcessor.process_data()`. 

We could even include Inheritance and Polymorphism if we create a `DataProcessor` super class and create subclasses for `FieldDataProcessor` and `WeatherDataProcessor`.  But, there is a good reason not to. The data handling of the field data is quite different from the handling of the weather data. The field data comes from an SQL database, and we transform the data in a particular way, while the weather data is sourced from a CSV, and processed differently.

So these two processes don't share processing steps, so it makes more sense to make a class for each.

So let's create the class framework:

In [4]:
import pandas as pd
from data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """
            SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

        self.initialize_logging(logging_level)
        
        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class. 
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # DataFrame methods 
    def ingest_sql_data(self):
        # First we want to get the data from the SQL database
        pass
    
    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods and applies the changes, step by step. This is the method we will call, and it will call the other methods in order
        
        weather_map_df = self.weather_station_mapping() 
        self.df = self.ingest_sql_data()
        self.df = self.rename_columns()
        self.df = self.apply_corrections()
        self.df = self.df.merge(weather_map_df, on='Field_ID', how='left')
        self.df = self.df.drop(columns="Unnamed: 0")

So this is the idea: We will instantiate the class, and call one method, `.process()` to ingest and clean the data.

In [62]:
# This code won't run for now, since we have not defined all of the methods.
field_processor = FieldDataProcessor()
field_processor.process()

AttributeError: 'NoneType' object has no attribute 'merge'

Now the notebook has a couple of lines of code but does an enormous amount of work to create the data pipeline. If we then call the Class `.df` attribute, we get the DataFrame, which we can analyse.

In [5]:
field_df = field_processor.df

NameError: name 'field_processor' is not defined

So all we have to do is create code for the various methods.

### `def ingest_sql_data()`

Let's create a copy of the class, and start filling out the code for the methods. We're dropping the `.process()` method for now and we'll add it back once it all works. 

⚙️ **Task:** Unscramble the code in the `.ingest_sql_data()` method. The method should return the initial DataFrame.

In [1]:
import pandas as pd
from data_ingestion import create_db_engine, query_data, read_from_web_CSV
import logging

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        
        
    
    def rename_columns(self):
        # Annual_yield and Crop_type must be swapped
        pass

    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

We can use the code below to check if the code works as expected:

In [2]:
field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_df = field_processor.df
print(field_df.shape)

2024-02-25 15:02:41,982 - data_ingestion - INFO - Database engine created successfully.


2024-02-25 15:02:42,780 - data_ingestion - INFO - Query executed successfully.
2024-02-25 15:02:42,783 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.


(5654, 18)


**Expected output:**
```python
<Timestamp> - data_ingestion - INFO - Database engine created successfully.
<Timestamp> - data_ingestion - INFO - Query executed successfully.
<Timestamp> - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
(5654, 18)

### `def rename_columns()`

Next up, we need to add `rename_columns()`

⚙️ **Task:** Copy your class into the top of this cell, and unscramble the code sections in the `.rename_columns()` method.

In [7]:

class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        
        # Copy in your class including the ingest_sql_data method here
    
    def rename_columns(self):
         # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]   

        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"

        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})
           
        
        self.logger.info(f"Swapped columns: {column1} with {column2}")


    def apply_corrections(self):
        # Correct the crop strings, Eg: 'cassaval' -> 'cassava'
        pass

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

Just note something here... `rename_columns()` does not return anything. It doesn't need to, because it is modifying the class attribute (data) `self.df`. This is the benefit of using a class. In each step we are applying changes to the DataFrame within the class. 

This code should instantiate the class, connect to the database, and swap the column names.

**Input:**

In [8]:
field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_processor.rename_columns()
field_df = field_processor.df
field_df['Annual_yield'].head(3)

2024-02-25 10:38:30,702 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 10:38:30,870 - data_ingestion - INFO - Query executed successfully.
2024-02-25 10:38:30,872 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 10:38:30,881 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type


0    0.751354
1    1.069865
2    2.208801
Name: Annual_yield, dtype: float64

**Expected output:**

```python 2024-02-13 14:35:56,581 - data_ingestion - INFO - Database engine created successfully.
<Timestamp> - data_ingestion - INFO - Query executed successfully.
<Timestamp> - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
<Timestamp> - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
0    0.751354
1    1.069865
2    2.208801
Name: Annual_yield, dtype: float64
```

### `def apply_corrections()`

⚙️ **Task:** Copy your class into the top of this cell, and fill in the `<MISSING CODE>` in the `.apply_corrections()` method.

In [9]:
class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        
     # Copy in your class including the ingest_sql_data method here
    
    def rename_columns(self):
         # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]   

        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"

        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})
           
        
        self.logger.info(f"Swapped columns: {column1} with {column2}")


    
    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))

    def weather_station_mapping(self):
        # Merge the weather station data to the main DataFrame
        pass

    def process(self):
        # This process calls the correct methods, and applies the changes, step by step. THis is the method we will call, and it will call the other methods in order.
        pass

We can test if our new method works before we move onto the next one.

**Input:**

In [10]:
field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_processor.rename_columns()
field_processor.apply_corrections()

field_df = field_processor.df
field_df.query("Crop_type in ['cassaval','wheatn']")

2024-02-25 10:38:45,817 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 10:38:45,979 - data_ingestion - INFO - Query executed successfully.
2024-02-25 10:38:45,980 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 10:38:45,987 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type


Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Annual_yield,Crop_type,Standard_yield


**Expected output:**

Empty DataFrame

### `def weather_station_mapping()`

⚙️ **Task:** Copy your class into the top of this cell, and fill in the `<MISSING CODE>` in the `.weather_station_mapping()` method.

In [19]:
class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        
     # Copy in your class including the ingest_sql_data method here
    
    def rename_columns(self):
         # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]   

        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"

        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})
           
        
        self.logger.info(f"Swapped columns: {column1} with {column2}")
        

    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))
        
# Copy in your class including the ingest_sql_data and method here

    def weather_station_mapping(self):
        return read_from_web_CSV(self.weather_map_data)
    

    def process(self):
        # This process calls the correct methods and applies the changes, step by step. This is the method we will call, and it will call the other methods in order.
        pass

Once again, test.

**Input:**

In [20]:
field_processor = FieldDataProcessor()
field_processor.ingest_sql_data()
field_processor.rename_columns()
field_processor.apply_corrections()
field_processor.weather_station_mapping()

field_df = field_processor.weather_station_mapping()
field_df['Weather_station'].unique()

2024-02-25 15:14:10,316 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 15:14:10,490 - data_ingestion - INFO - Query executed successfully.
2024-02-25 15:14:10,491 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 15:14:10,497 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-02-25 15:14:11,370 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 15:14:12,018 - data_ingestion - INFO - CSV file read successfully from the web.


array([0, 1, 2, 3, 4], dtype=int64)

**Expected output:**

```python 
<Timestamp> - data_ingestion - INFO - Database engine created successfully.
<Timestamp> - data_ingestion - INFO - Query executed successfully.
<Timestamp> - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
<Timestamp> - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
<Timestamp> - data_ingestion - INFO - CSV file read successfully from the web.

array([4, 0, 1, 2, 3], dtype=int64)
```

### `def process()`

Ok, now we put it all together. Remember that the `.process()` method calls all of the other methods in order.

⚙️ **Task:** Copy your class into the top of this cell, and complete the `.process()` method.

In [17]:
class FieldDataProcessor:

    def __init__(self, logging_level="INFO"): # When we instantiate this class, we can optionally specify what logs we want to see

        # Initialising class with attributes we need. Refer to the code above to understand how each attribute relates to the code
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"
        
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        
     # Copy in your class including the ingest_sql_data method here
    
    def rename_columns(self):
         # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]   

        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"

        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})
           
        
        self.logger.info(f"Swapped columns: {column1} with {column2}")
        

    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))
        
# Copy in your class including the ingest_sql_data and method here

    def weather_station_mapping(self):
        return read_from_web_CSV(self.weather_map_data)

    def process(self):
        self.ingest_sql_data()
        #Insert your code here
        self.ingest_sql_data()
        self.rename_columns()
        self.apply_corrections()
        self.weather_station_mapping()
          

**Input:**

In [18]:
field_processor = FieldDataProcessor()
field_processor.process()

field_df = field_processor.weather_station_mapping()
field_df['Weather_station'].unique()

2024-02-25 15:10:22,657 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 15:10:22,842 - data_ingestion - INFO - Query executed successfully.
2024-02-25 15:10:22,845 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 15:10:22,849 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 15:10:22,995 - data_ingestion - INFO - Query executed successfully.
2024-02-25 15:10:22,999 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 15:10:23,005 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-02-25 15:10:25,608 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 15:10:27,127 - data_ingestion - INFO - CSV file read successfully from the web.


array([0, 1, 2, 3, 4], dtype=int64)

**Expected output:**

```python
<Timestamp>  - data_ingestion - INFO - Database engine created successfully.
<Timestamp>  - data_ingestion - INFO - Query executed successfully.
<Timestamp>  - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
<Timestamp>  - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
<Timestamp> - data_ingestion - INFO - CSV file read successfully from the web.

array([4, 0, 1, 2, 3], dtype=int64)
```

### Centralising the data pipeline configuration details

But now we're running into a problem. We stored some data about the SQL database and web files in the `data_ingestion.py` module, and when we load the `field_data_proccessor.py` module we are referencing it again. 

In [None]:
# From the data_ingestion.py module

db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"


# From the field_data_processor class
        self.db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'
        self.sql_query = """SELECT *
            FROM geographic_features
            LEFT JOIN weather_features USING (Field_ID)
            LEFT JOIN soil_and_crop_features USING (Field_ID)
            LEFT JOIN farm_management_features USING (Field_ID)
            """
        self.columns_to_rename = {'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'}
        self.values_to_rename = {'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}
        self.weather_map_data = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

So if we want to change the database path, the query, or anything else, where would we do it? It would be better to have one central place where we store all of the specific details of the data pipeline and refer to it in the modules and our main script. A good approach is to create a dictionary in our main script that has all of the parameters, and then we reference it in our modules. 

<br>

⚙️ **Task:** Add the configuration details from the `data_ingestion.py` module into the `config_params` dictionary. 

In [None]:
config_params = {
    "sql_query": """SELECT *
    FROM geographic_features
    LEFT JOIN weather_features USING (Field_ID)
    LEFT JOIN soil_and_crop_features USING (Field_ID)
    LEFT JOIN farm_management_features USING (Field_ID)
    """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'} ,# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
}

Now we can remove these lines form the `data_ingestion.py` module file, since we call them from the `FieldDataProcessor` class.

<br>

⚙️ **Task:** Remove the following lines from the `data_ingestion.py` module:

In [None]:
# Remove these lines from the data_ingestion.py module
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

In the `FieldDataProcessor` class, instead of passing in the parameters as strings, we reference the `config_params` dictionary instead.

<br>

⚙️ **Task:** Alter the attributes of the `FieldDataProcessor` class to reference the `config_params` dictionary instead. Add `config_params` as a parameter to the class instantiation method as shown below and complete the code.

In [21]:
### START FUNCTION

class FieldDataProcessor:

    def __init__(self, config_params, logging_level="INFO"):  # Make sure to add this line, passing in config_params to the class 
        self.db_path = config_params['db_path']
        self.sql_query = config_params["sql_query"]
        self.columns_to_rename = config_params["columns_to_rename"]
        self.values_to_rename = config_params["values_to_rename"]
        self.weather_map_data = config_params["weather_mapping_csv"]

        # Add the rest of your class code here
        self.initialize_logging(logging_level)

        # We create empty objects to store the DataFrame and engine in
        self.df = None
        self.engine = None
        
    # This method enables logging in the class.
    def initialize_logging(self, logging_level):
        """
        Sets up logging for this instance of FieldDataProcessor.
        """
        logger_name = __name__ + ".FieldDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

        # Use self.logger.info(), self.logger.debug(), etc.


    # let's focus only on this part from now on
    def ingest_sql_data(self):
        self.engine = create_db_engine(self.db_path)
        self.df = query_data(self.engine, self.sql_query)
        self.logger.info("Sucessfully loaded data.")
        return self.df
        
     # Copy in your class including the ingest_sql_data method here
    
    def rename_columns(self):
         # Extract the columns to rename from the configuration
        column1, column2 = list(self.columns_to_rename.keys())[0], list(self.columns_to_rename.values())[0]   

        # Temporarily rename one of the columns to avoid a naming conflict
        temp_name = "__temp_name_for_swap__"
        while temp_name in self.df.columns:
            temp_name += "_"

        # Perform the swap
        self.df = self.df.rename(columns={column1: temp_name, column2: column1})
        self.df = self.df.rename(columns={temp_name: column2})
           
        
        self.logger.info(f"Swapped columns: {column1} with {column2}")
        

    def apply_corrections(self, column_name='Crop_type', abs_column='Elevation'):
        self.df[abs_column] = self.df[abs_column].abs()
        self.df[column_name] = self.df[column_name].apply(lambda crop: self.values_to_rename.get(crop, crop))
        
# Copy in your class including the ingest_sql_data and method here

    def weather_station_mapping(self):
        return read_from_web_CSV(self.weather_map_data)

    def process(self):
        self.ingest_sql_data()
        #Insert your code here
        self.ingest_sql_data()
        self.rename_columns()
        self.apply_corrections()
        self.weather_station_mapping()
      
        
### END FUNCTION

<br>

⚙️ **Task:** Instantiate the class with the new dictionary.

**Input:**

In [22]:
config_params = {
    "sql_query": """SELECT *
    FROM geographic_features
    LEFT JOIN weather_features USING (Field_ID)
    LEFT JOIN soil_and_crop_features USING (Field_ID)
    LEFT JOIN farm_management_features USING (Field_ID)
    """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'} ,# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
}

field_processor = FieldDataProcessor(config_params)
field_processor.process()

field_df = field_processor.weather_station_mapping()
field_df['Weather_station'].unique()

2024-02-25 15:18:38,122 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 15:18:38,303 - data_ingestion - INFO - Query executed successfully.
2024-02-25 15:18:38,304 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 15:18:38,307 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 15:18:38,461 - data_ingestion - INFO - Query executed successfully.
2024-02-25 15:18:38,463 - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 15:18:38,471 - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-02-25 15:18:47,155 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 15:19:08,554 - data_ingestion - INFO - CSV file read successfully from the web.


array([0, 1, 2, 3, 4], dtype=int64)

**Expected output:**

```python
<Timestamp>  - data_ingestion - INFO - Database engine created successfully.
<Timestamp>  - data_ingestion - INFO - Query executed successfully.
<Timestamp>  - __main__.FieldDataProcessor - INFO - Sucessfully loaded data.
<Timestamp>  - __main__.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
<Timestamp> - data_ingestion - INFO - CSV file read successfully from the web.

array([4, 0, 1, 2, 3], dtype=int64)
```

<br> 

### Creating `field_data_processor.py`

Now we have a robust data processing class for the field-related data. Our final step is to create the module file and document our code.

<br>

⚙️ **Task:** Complete the `field_data_processor` module. Include all of the required content, ensure the module is PEP 8 complient, include all imports and parameter definitions, and create the `field_data_processor.py` module file.

Restart the kernel before running this code:

**Input:**

In [1]:
import re # Importing all the packages we will use eventually
import numpy as np
import pandas as pd
from field_data_processor import FieldDataProcessor # Importing our new module
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params = {
    "sql_query": """SELECT *
    FROM geographic_features
    LEFT JOIN weather_features USING (Field_ID)
    LEFT JOIN soil_and_crop_features USING (Field_ID)
    LEFT JOIN farm_management_features USING (Field_ID)
    """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'} ,# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
}


# Instantiating the class with config_params passed to the class as a parameter 
field_processor = FieldDataProcessor(config_params)
field_processor.process()

# Test
field_df['Weather_station'].unique()

2024-02-25 12:07:25,722 - data_ingestion - INFO - Database engine created successfully.


data_ingestion
data_ingestion
data_ingestion


2024-02-25 12:07:25,878 - data_ingestion - INFO - Query executed successfully.
2024-02-25 12:07:25,880 - field_data_processor.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 12:07:25,884 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 12:07:26,099 - data_ingestion - INFO - Query executed successfully.
2024-02-25 12:07:26,104 - field_data_processor.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 12:07:26,109 - field_data_processor.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-02-25 12:07:27,407 - data_ingestion - INFO - CSV file read successfully from the web.


NameError: name 'field_df' is not defined

**Expected output:**

```python
<Timestamp>  - data_ingestion - INFO - Database engine created successfully.
<Timestamp>  - data_ingestion - INFO - Query executed successfully.
<Timestamp>  - field_data_processor.FieldDataProcessor - INFO - Sucessfully loaded data.
<Timestamp>  - field_data_processor.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
<Timestamp> - data_ingestion - INFO - CSV file read successfully from the web.

array([4, 0, 1, 2, 3], dtype=int64)
```

## Weather data processor

Now for the last module. The `WeatherDataProcessor` class will be dealing with all of the weather-related data. Again we want to instantiate the class, then call a `.process()` method to import and clean the data. Here is the code we used last time:

In [2]:
import re # Importing the regex pattern
import numpy as np

weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

patterns = {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }

def extract_measurement(message):
    """
    Extracts a numeric measurement value from a given message string.

    The function applies regular expressions to identify and extract
    numeric values related to different types of measurements such as
    Rainfall, Average Temperatures, and Pollution Levels from a text message.
    It returns the key of the matching record, and first matching value as a floating-point number.
    
    Parameters:
    message (str): A string message containing the measurement information.

    Returns:
    float: The extracted numeric value of the measurement if a match is found;
           otherwise, None.

    The function uses the following patterns for extraction:
    - Rainfall: Matches numbers (including decimal) followed by 'mm', optionally spaced.
    - Ave_temps: Matches numbers (including decimal) followed by 'C', optionally spaced.
    - Pollution_level: Matches numbers (including decimal) following 'Pollution at' or '='.
    
    Example usage:
    extract_measurement("【2022-01-04 21:47:48】温度感应: 现在温度是 12.82C.")
    # Returns: 'Temperature', 12.82
    """
    
    for key, pattern in patterns.items(): # Loop through all of the patterns and check if it matches the pattern value.
        match = re.search(pattern, message)
        if match:
            # Extract the first group that matches, which should be the measurement value if all previous matches are empty.
            # print(match.groups()) # Uncomment this line to help you debug your regex patterns.
            return key, float(next((x for x in match.groups() if x is not None)))
    
    return None, None

# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])


Luckily the other team did most of the work this time, so we only have to fill in a couple of details. First off, we need to add more keys to the `config_params` dictionary. Specifically the regex pattern we used to get the messages, and the URL of the weather data.

<br>

⚙️ **Task:** Complete the values for the new keys in `config_params`.

In [6]:
import pandas as pd
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

patterns = {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }

config_params = {
    "sql_query": """SELECT *
    FROM geographic_features
    LEFT JOIN weather_features USING (Field_ID)
    LEFT JOIN soil_and_crop_features USING (Field_ID)
    LEFT JOIN farm_management_features USING (Field_ID)
    """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'} ,# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the URL for the weather station data
    "regex_patterns" : {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    } , # Insert the regex pattern we used to process the messages
}

Then we already have the class fully set up. We just have to make sure the formatting is correct, and that the module is documented properly.

⚙️ **Task:** Complete the `weather_data_processor` module. Include all of the required content, ensure the module is PEP 8 compliant, include all imports and parameter definitions, and create the `weather_data_processor.py` module file.

In [3]:
# These are the imports we're going to use in the weather data processing module
import re
import numpy as np
import pandas as pd
import logging
from data_ingestion import read_from_web_CSV

In [4]:
### START FUNCTION 

class WeatherDataProcessor:
    def __init__(self, config_params, logging_level="INFO"): # Now we're passing in the confi_params dictionary already
        self.weather_station_data = config_params['weather_csv_path']
        self.patterns = config_params['regex_patterns']
        self.weather_df = None  # Initialize weather_df as None or as an empty DataFrame
        self.initialize_logging(logging_level)

    def initialize_logging(self, logging_level):
        logger_name = __name__ + ".WeatherDataProcessor"
        self.logger = logging.getLogger(logger_name)
        self.logger.propagate = False  # Prevents log messages from being propagated to the root logger

        # Set logging level
        if logging_level.upper() == "DEBUG":
            log_level = logging.DEBUG
        elif logging_level.upper() == "INFO":
            log_level = logging.INFO
        elif logging_level.upper() == "NONE":  # Option to disable logging
            self.logger.disabled = True
            return
        else:
            log_level = logging.INFO  # Default to INFO

        self.logger.setLevel(log_level)

        # Only add handler if not already added to avoid duplicate messages
        if not self.logger.handlers:
            ch = logging.StreamHandler()  # Create console handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

    def weather_station_mapping(self):
        self.weather_df = read_from_web_CSV(self.weather_station_data)
        self.logger.info("Successfully loaded weather station data from the web.") 
        # Here, you can apply any initial transformations to self.weather_df if necessary.

    
    def extract_measurement(self, message):
        for key, pattern in self.patterns.items():
            match = re.search(pattern, message)
            if match:
                self.logger.debug(f"Measurement extracted: {key}")
                return key, float(next((x for x in match.groups() if x is not None)))
        self.logger.debug("No measurement match found.")
        return None, None

    def process_messages(self):
        if self.weather_df is not None:
            result = self.weather_df['Message'].apply(self.extract_measurement)
            self.weather_df['Measurement'], self.weather_df['Value'] = zip(*result)
            self.logger.info("Messages processed and measurements extracted.")
        else:
            self.logger.warning("weather_df is not initialized, skipping message processing.")
        return self.weather_df

    def calculate_means(self):
        if self.weather_df is not None:
            means = self.weather_df.groupby(by=['Weather_station_ID', 'Measurement'])['Value'].mean()
            self.logger.info("Mean values calculated.")
            return means.unstack()
        else:
            self.logger.warning("weather_df is not initialized, cannot calculate means.")
            return None
    
    def process(self):
        self.weather_station_mapping()  # Load and assign data to weather_df
        self.process_messages()  # Process messages to extract measurements
        self.logger.info("Data processing completed.")
### END FUNCTION

Once we have the `weather_data_processor` module set up, we can run the code below to import the new module, and make sure our module worked correctly.

**Input:**

In [8]:
import re
import numpy as np
import pandas as pd
# from field_data_processor import FieldDataProcessor
from weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params = {
    "sql_query": """SELECT *
    FROM geographic_features
    LEFT JOIN weather_features USING (Field_ID)
    LEFT JOIN soil_and_crop_features USING (Field_ID)
    LEFT JOIN farm_management_features USING (Field_ID)
    """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'} ,# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the URL for the weather station data
    "regex_patterns" : {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }  , # Insert the regex pattern we used to process the messages
}

# Ignoring the field data for now.
# field_processor = FieldDataProcessor(config_params)
# field_processor.process()
# field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

weather_df['Measurement'].unique()

2024-02-25 16:33:26,780 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 16:33:26,781 - weather_data_processor.WeatherDataProcessor - INFO - Successfully loaded weather station data from the web.
2024-02-25 16:33:26,843 - weather_data_processor.WeatherDataProcessor - INFO - Messages processed and measurements extracted.
2024-02-25 16:33:26,845 - weather_data_processor.WeatherDataProcessor - INFO - Data processing completed.


array(['Temperature', 'Pollution_level', 'Rainfall'], dtype=object)

**Expected output:**

```python
<Timestamp> - data_ingestion - INFO - CSV file read successfully from the web.
<Timestamp> - __main__.WeatherDataProcessor  - INFO - Successfully loaded weather station data from the web.
<Timestamp> - __main__.WeatherDataProcessor  - INFO - Messages processed and measurements extracted.
<Timestamp> - __main__.WeatherDataProcessor  - INFO - Data processing completed.

array(['Temperature', 'Pollution_level', 'Rainfall'], dtype=object)
```

### Validating our data pipeline

So we finally have working modules that now automatically pull data from the database  (or the web), process it, clean it, and return our starting DataFrame. Before we jump in and analyse the data, let's pause for a second and ask: Did the changes actually get applied? Did we correct the elevation data, did we rename the columns? We could go back to the old ways, and create queries to check, but a better way is to **test our dataset**. 

Let's get the data in first. Remember to use your `config_params` dictionary.

In [2]:
import re
import numpy as np
import pandas as pd
from field_data_processor import FieldDataProcessor
from weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params = {
    "sql_query": """SELECT *
    FROM geographic_features
    LEFT JOIN weather_features USING (Field_ID)
    LEFT JOIN soil_and_crop_features USING (Field_ID)
    LEFT JOIN farm_management_features USING (Field_ID)
    """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'} ,# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the URL for the weather station data
    "regex_patterns" :  {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }  , # Insert the regex pattern we used to process the messages
}

field_processor = FieldDataProcessor(config_params)
field_processor.process()
field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

2024-02-25 16:41:09,063 - data_ingestion - INFO - Database engine created successfully.


data_ingestion
data_ingestion
data_ingestion


2024-02-25 16:41:09,235 - data_ingestion - INFO - Query executed successfully.
2024-02-25 16:41:09,236 - field_data_processor.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 16:41:09,244 - field_data_processor.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-02-25 16:41:10,214 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 16:41:10,768 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 16:41:10,770 - weather_data_processor.WeatherDataProcessor - INFO - Successfully loaded weather station data from the web.
2024-02-25 16:41:10,827 - weather_data_processor.WeatherDataProcessor - INFO - Messages processed and measurements extracted.
2024-02-25 16:41:10,828 - weather_data_processor.WeatherDataProcessor - INFO - Data processing completed.


There should be a `validate_data.py` file in the notebook directory. This is a `pytest` script that does a couple of tests to see if the data we're expecting, is what we actually have. Have a look at the test script, and try to understand what we're testing.

`pytest` normally runs from the command line because it is set up to be automated. To test the data, we have to give `pytest` access to that data. The simplest way to do this is by creating CSV files, importing them into `validate_data.py`, and running the tests.

The following code creates CSV files, runs `pytest` in the terminal using `!pytest validate_data.py -v`, and deletes the CSV files once the test is complete.

In [4]:
# %pip install pytest

weather_df.to_csv('sampled_weather_df.csv', index=False)
field_df.to_csv('sampled_field_df.csv', index=False)

!pytest validate_data.py -v

import os# Define the file paths
weather_csv_path = 'sampled_weather_df.csv'
field_csv_path = 'sampled_field_df.csv'

# Delete sampled_weather_df.csv if it exists
if os.path.exists(weather_csv_path):
    os.remove(weather_csv_path)
    print(f"Deleted {weather_csv_path}")
else:
    print(f"{weather_csv_path} does not exist.")

# Delete sampled_field_df.csv if it exists
if os.path.exists(field_csv_path):
    os.remove(field_csv_path)
    print(f"Deleted {field_csv_path}")
else:
    print(f"{field_csv_path} does not exist.")

platform win32 -- Python 3.12.0, pytest-8.0.2, pluggy-1.4.0 -- c:\Users\user\AppData\Local\Programs\Python\Python312\python.exe
cachedir: .pytest_cache
rootdir: g:\alx full  folder\Alx ds folder\python\code challenge\Data validation
[1mcollecting ... [0mcollected 0 items

Deleted sampled_weather_df.csv
Deleted sampled_field_df.csv


**Expected output:**

```python
============================ test session starts =============================
platform win32 -- Python 3.12.1, pytest-8.0.0, pluggy-1.4.0 -- ...
cachedir: .pytest_cache
rootdir: ...
plugins: anyio-4.2.0
collecting ... collected 7 items

validate_data.py::test_read_weather_DataFrame_shape PASSED               [ 14%]
validate_data.py::test_read_field_DataFrame_shape PASSED                 [ 28%]
validate_data.py::test_weather_DataFrame_columns PASSED                  [ 42%]
validate_data.py::test_field_DataFrame_columns PASSED                    [ 57%]
validate_data.py::test_field_DataFrame_non_negative_elevation PASSED     [ 71%]
validate_data.py::test_crop_types_are_valid PASSED                       [ 85%]
validate_data.py::test_positive_rainfall_values PASSED                   [100%]

============================== warnings summary ===============================
..\..\..\..\..\..\..\..\anaconda3\envs\Latest\Lib\site-packages\dateutil\tz\tz.py:37
  ...: DeprecationWarning: datetime.datetime.utcfromtimestamp() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.fromtimestamp(timestamp, datetime.UTC).
    EPOCH = datetime.datetime.utcfromtimestamp(0)

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
======================== 7 passed, 1 warning in 1.13s =========================
Deleted sampled_weather_df.csv
Deleted sampled_field_df.csv
```

>⚠️ Depending on the version of Python, there may be various warnings like the one above. These are normally `DeprecationWarnings` so we can safely ignore these for now. We're interested in whether all the dataset tests passed. 

Great! Now we know our data resembles what we expect! As our project evolves we may have to add more module functionality or create more rigorous tests of the data. 


# Validating the dataset

Before we actually get to the analysis part, take a moment to notice how much simpler this data import is now. It feels like a lot of work, but now one cell of code imports and cleans all of our data.

In [6]:
import re
import numpy as np
import pandas as pd
from field_data_processor import FieldDataProcessor
from weather_data_processor import WeatherDataProcessor
import logging 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

config_params =  {
    "sql_query": """SELECT *
    FROM geographic_features
    LEFT JOIN weather_features USING (Field_ID)
    LEFT JOIN soil_and_crop_features USING (Field_ID)
    LEFT JOIN farm_management_features USING (Field_ID)
    """, # Insert your SQL query
    "db_path":'sqlite:///Maji_Ndogo_farm_survey_small.db', # Insert the db_path of the database
    "columns_to_rename":{'Annual_yield': 'Crop_type', 'Crop_type': 'Annual_yield'} ,# Insert the disctionary of columns we want to swop the names of, 
    "values_to_rename":{'cassaval': 'cassava', 'wheatn': 'wheat', 'teaa': 'tea'}, # Insert the croptype renaming dictionary
    "weather_csv_path": "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the weather data CSV here
    "weather_mapping_csv":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv", # Insert the weather data mapping CSV here
    # Paste in your previous dictionary data in here

    # Add two new keys
   "weather_csv_path":"https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv", # Insert the URL for the weather station data
    "regex_patterns" : {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    } , # Insert the regex pattern we used to process the messages
}
field_processor = FieldDataProcessor(config_params)
field_processor.process()
field_df = field_processor.df

weather_processor = WeatherDataProcessor(config_params)
weather_processor.process()
weather_df = weather_processor.weather_df

# Rename 'Ave_temps' in field_df to 'Temperature' to match weather_df
field_df.rename(columns={'Ave_temps': 'Temperature'}, inplace=True)

2024-02-25 16:51:29,957 - data_ingestion - INFO - Database engine created successfully.
2024-02-25 16:51:30,117 - data_ingestion - INFO - Query executed successfully.
2024-02-25 16:51:30,118 - field_data_processor.FieldDataProcessor - INFO - Sucessfully loaded data.
2024-02-25 16:51:30,124 - field_data_processor.FieldDataProcessor - INFO - Swapped columns: Annual_yield with Crop_type
2024-02-25 16:51:31,120 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 16:51:34,935 - data_ingestion - INFO - CSV file read successfully from the web.
2024-02-25 16:51:34,937 - weather_data_processor.WeatherDataProcessor - INFO - Successfully loaded weather station data from the web.
2024-02-25 16:51:35,108 - weather_data_processor.WeatherDataProcessor - INFO - Messages processed and measurements extracted.
2024-02-25 16:51:35,109 - weather_data_processor.WeatherDataProcessor - INFO - Data processing completed.


Ok, now we can circle back to the start. As I mentioned, setting a tolerance might have been a simple way to measure if our field data and weather data agree, but we didn't take into account if either dataset was spread out.

I hope you have some idea of the problem, but we need to tell this story anyway, so hopefully, I can convince you I made an error last time, by the time we're done.

Back to our initial plan:

1. Create a null hypothesis.
1. Import the `MD_agric_df` dataset and clean it up.
1. Import the weather data.
1. Map the weather data to the field data.
1. Calculate the means of the weather station dataset and the means of the main dataset.
2. Calculate all the parameters we need to do a t-test. 
3. Interpret our results.

## Hypothesis

So what are we testing with our null hypothesis $H_0$? Well, we want to know if our field data is representing the reality in Maji Ndogo by looking at an independent set of data. If our field data (means) are the same as the weather data (means), then it indicates no significant difference between the datasets. We're essentially saying that any difference we see between these means is because of randomness. However, if the means differ significantly, we'll know there is a reason for it, and that it is not just a random fluctuation in the data. 

<br>

Given a significance level $\alpha$ of 0.05 for a two-tailed test, we have the following conditions for our hypothesis test at a 95% confidence interval:

- $H_0$: There is no significant difference between the means of the two datasets. This is expressed as $\mu_{field} = \mu_{weather}$.

- $H_a$: There is a significant difference between the means of the two datasets. This is expressed as $\mu_{field} \neq \mu_{weather}$.

<br>

If the p-value obtained from the test:
- is less than or equal to the significance level, so $p \leq \alpha$, we reject the null hypothesis.
- is larger than the significance level, so $p > \alpha$, we cannot reject the null hypothesis, as we cannot find a statistically significant difference between the datasets at the 95% confidence level.

Now, let's code it out. 

First, we're going to import all of the packages and define a few variables. You might notice we're importing a new method, `.ttest_ind()`. This method takes in two data columns and calculates means, variance, and returns the the t- and p-statistics. So our t-test is reduced to one line. Since our alternative hypothesis does not make a claim of greater or less than, we will use the two-sided t-test, by adding  the `alternative = 'two-sided'` keyword.

In [1]:
from scipy.stats import ttest_ind
import numpy as np

# Now, the measurements_to_compare can directly use 'Temperature', 'Rainfall', and 'Pollution_level'
measurements_to_compare = ['Temperature', 'Rainfall', 'Pollution_level']

ModuleNotFoundError: No module named 'scipy'

Let's pause for a second and clarify what exactly we're comparing. 

We want to compare the means of the temperature, rainfall, and pollution data, for fields assigned to a specific weather station. So for both datasets, we need to isolate the measurement type and weather station for each data, so we're comparing the correct means.

Let's break down what we need to do:
1. We need to filter both `field_df` and `weather_df` based on the given station ID and measurement. We can use `filter_field_data(df, station_id, measurement)` and `filter_weather_data(df, station_id, measurement)`.  
2. We need to perform a t-test to conduct the t-test on the filtered data. So we're going to use `ttest_ind(data_col1, data_col2, equal_var=False)` from `scipy.stats`.
3. `print_ttest_results(station_id, measurement, p_val, alpha)` to interpret and print the results from the t-test.

We'll first define these functions, focusing on `Temperature` for `station ID = 0`. Then, we'll integrate these functions into a loop that iterates over each station ID and measurement type.

<br> 

⚙️ **Task:** Create a `filter_field_data` function that takes in the `field_df` DataFrame, the `station_id`, and `measurement` type, and retuns a **single column** (series) of data filtered by the `station_id`, and `measurement`.

In [None]:
### START FUNCTION
def filter_field_data(df, station_id, measurement):

    return df[(df['station_id'] == station_id) & (df['measurement'] == measurement)]
    
### END FUNCTION

<br>

**Input 1:**

In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
field_values

**Expected outcome:**

```python
1       13.35
2       13.30
8       12.80
10      13.70
14      13.35
        ...  
5627    13.30
5630    14.25
5632    11.00
5638    13.30
5642    12.85
Name: Temperature, Length: 1375, dtype: float64
```

<br>

**Input 2:**

In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
print(f"Shape: {field_values.shape}, First value: {field_values.iloc[0]} ")

**Expected outcome:**

`Shape: (1375,), First value: 13.35 `

<br> 

⚙️ **Task:** Create a data filter function that takes in the `weather_df` DataFrame, the `station_id`, and `measurement` type, and returns a **single column** (series) of data filtered by the `station_id`, and `measurement`.

In [None]:
### START FUNCTION

def filter_weather_data(df, station_id, measurement):
    df = df[(df['station_id'] == station_id) & (df['measurement'] == measurement)]
    return df

### END FUNCTION

<br> 

**Input 1:**

In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement

weather_values = filter_weather_data(weather_df, station_id, measurement)
weather_values


**Expected outcome:**

```python
0       12.82
2       14.53
29      14.28
32      12.87
67      13.13
        ...  
1804    12.77
1805    14.13
1817    13.14
1833    14.14
1834    13.61
Name: Value, Length: 100, dtype: float64
```

<br> 

**Input 2:**

In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement

weather_values = filter_weather_data(weather_df, station_id, measurement)

print(f"Shape: {weather_values.shape}, First value: {weather_values.iloc[0]}")

**Expected outcome:**

`Shape: (100,), First value: 12.82 `

⚙️ **Task:** Create a function that calculates the t-statistic and p-value. The function should accept two **single columns** of data and return a tuple of the t-statistic and p-value.

In [None]:
### START FUNCTION
def run_ttest(Column_A, Column_B):
    t_statisctic, p_value = stats.ttest_ind(Column_A, Column_B)
    return t_statisctic, p_value
    
### END FUNCTION

<br> 

**Input:**

In [None]:
# Example for station ID 0 and Temperature
station_id = 0
alpha = 0.05
measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
weather_values = filter_weather_data(weather_df, station_id, measurement)

# Perform t-test
t_stat, p_val = run_ttest(field_values, weather_values)
print(f"T-stat: {t_stat:.5f}, p-value: {p_val:.5f}")

**Expected outcome:**

`T-stat: -0.11632, p-value: 0.90761`

<br>

⚙️ **Task:** Replace the **\<MISSING CODE>** to print out the t-test result.

In [None]:
### START FUNCTION

def print_ttest_results(station_id, measurement, p_val, alpha):
    """
    Interprets and prints the results of a t-test based on the p-value.
    """
    if <MISSING CODE>:
        print(f"   Significant difference in {measurement} detected at Station  {station_id}, (P-Value: {p_val:.5f} < {alpha}). Null hypothesis rejected.")
    else:
        print(f"   No significant difference in {measurement} detected at Station  {station_id}, (P-Value: {p_val:.5f} > {alpha}). Null hypothesis not rejected.")

### END FUNCTION

**Input:**

In [None]:
# Example for station ID 0 and Temperature
station_id = 0

measurement = 'Temperature'

# Filter data for the specific station and measurement
field_values = filter_field_data(field_df, station_id, measurement)
weather_values = filter_weather_data(weather_df, station_id, measurement)

# Perform t-test
t_stat, p_val = run_ttest(field_values, weather_values)
print_ttest_results(station_id, measurement, p_val, alpha)

**Expected outcome:**

`No significant difference in Temperature detected (P-Value: 0.90761 > 0.05). Null hypothesis not rejected.`

Now we can put it all together in a loop.

<br>

⚙️ **Task:** Create a function that loops over `measurements_to_compare` and all `station_id`, perform a t-test and print the results. The function should accept `field_df`, `weather_df`, `list_measurements_to_compare`, `alpha`. the value of `alpha` should default to a value of 0.05. Hint: use `print_ttest_results()`.

In [None]:
### START FUNCTION
def hypothesis_results(field_df, weather_df, list_measurements_to_compare, alpha = 0.05):
    for measurement in list_measurements_to_compare:
        for station_id in field_df['station_id'].unique():
            Column_A = field_df[(field_df['station_id'] == station_id) & (field_df['measurement'] == measurement)]['value']
            Column_B = weather_df[(weather_df['measurement'] == measurement)]['value']
            t_statisctic, p_value = stats.ttest_ind(Column_A, Column_B)
            print_ttest_results(measurement, station_id,  t_statisctic, p_value, alpha)

### END FUNCTION

**Input:**

In [None]:
alpha = 0.05
hypothesis_results(field_df, weather_df, measurements_to_compare, alpha)

**Expected outcome:**
```python 
   No significant difference in Temperature detected at Station 0, (P-Value: 0.90761 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 0, (P-Value: 0.21621 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 0, (P-Value: 0.56418 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 1, (P-Value: 0.47241 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 1, (P-Value: 0.54499 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 1, (P-Value: 0.24410 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 2, (P-Value: 0.88671 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 2, (P-Value: 0.36466 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 2, (P-Value: 0.99388 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 3, (P-Value: 0.66445 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 3, (P-Value: 0.39847 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 3, (P-Value: 0.15466 > 0.05). Null hypothesis not rejected.
   No significant difference in Temperature detected at Station 4, (P-Value: 0.88575 > 0.05). Null hypothesis not rejected.
   No significant difference in Rainfall detected at Station 4, (P-Value: 0.33237 > 0.05). Null hypothesis not rejected.
   No significant difference in Pollution_level detected at Station 4, (P-Value: 0.21508 > 0.05). Null hypothesis not rejected.
   ```

Great! There we go. For all of our measurements the p-value > alpha, so there is not enough evidence to reject the null hypothesis. This means we have no evidence to suggest that the weather data is different from the field data. This makes us confident that our field data, at least in terms of temperature, rainfall, and pollution level is reflecting the reality. 

Why was this important? Well, we saw from the EDA that there were some relationships, and possible correlations with the standard yield, but we really can't say what affects a crop's success, because all of them seemed to. In a sense, we as humans could not clearly see the relationships, if we were given a set of conditions like rainfall, pH, and crop type, we could not reliably estimate what the standard yield of a crop is, because the relationships are hard to understand.

So our next step is to allow a machine to look for patterns, which is Machine Learning (ML). Computers are not limited to three dimensions, can calculate for hours, and find hidden patterns we cannot. Machine learning follows the basic principle across computational domains; junk in, junk out. We needed to make sure that the data we're feeding into ML models is accurate. Now we know, and we're ready for the next step. 

You must have been itching to get into AI, so we'll dive in soon.

Until then, look after yourself!
Saana

#  

<div align="center" style=" font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/ExploreAI_logos/EAI_Blue_Dark.png"  style="width:200px";/>
</div>