<div align="center" style=" font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/Python-Notebook-Banners/Code_challenge.png"  style="display: block; margin-left: auto; margin-right: auto;";/>
</div>

# Integrated project: Validating our data
© ExploreAI Academy

In this Code Challenge we’re diving into the agricultural dataset again to continue to validate our data. Before we do that, we’re pausing to build a data pipeline that will ingest and clean our data with the press of a button, cleaning up our code significantly. Once that’s ready, we’ll complete our data validation.

⚠️ **NOTE that this code challenge is graded and will contribute to your overall marks for this module. Submit this notebook for grading. Note that the names of the functions are different in this notebook. Transfer the code in your notebook to this submission notebook**

### Instructions

- **Do not add or remove cells in this notebook. Do not edit or remove the `### START FUNCTION` or `### END FUNCTION` comments. Do not add any code outside of the functions you are required to edit. Doing any of this will lead to a mark of 0%!**

- Answer the questions according to the specifications provided.

- Use the given cell in each question to see if your function matches the expected outputs.

- Do not hard-code answers to the questions.

- The use of StackOverflow, Google, and other online tools is permitted. However, copying a fellow student's code is not permissible and is considered a breach of the Honour code. Doing this will result in a mark of 0%.

# Introduction

Let's pick up where we left off, shall we?

Recall our previous disappointment - the mismatch between our dataset and the weather station data? That was a curveball, wasn't it? Half of our measurements were out of range, raising eyebrows and doubts alike. Our quest for data validation had hit a snag, but as any seasoned data scientist will tell you, every problem is a hidden opportunity waiting to be discovered.

I'm sure you have some ideas on how to solve this, so I'll share mine...

One thing that bugs me is the tolerance level I chose. Why 1.5%? 

We saw that half of our means were not within that tolerance, but why would it be within 1.5% and not 5%?

Although we may think 1.5% is a good measurement of accuracy, there may be errors in each dataset, and more variation in one than the other, which could make the means differ by more than 1.5% even if the objective truth is that the means are the same. We need to be a little more scientific with our approach and account for the difference in the data. I am sure you know what I am about to say next, right?

**Hypothesis testing** takes into account both the means and the variances of the distributions being compared. The variance here is crucial because it gives us insight into the spread of the data points around the means for our two datasets. Two samples could have the same mean but very different variances, leading to different interpretations of their similarities or differences.

Our main goal is the same: Is the data in our `MD_agric_df` dataset representative of reality? To answer this, we use weather-related data from nearby stations to validate our results. If the weather data matches the data we have, we can be more confident that our dataset represents reality. 

So what's the plan? 
1. Create a null hypothesis.
1. Import the `MD_agric_df` dataset and clean it up.
1. Import the weather data.
1. Map the weather data to the field data.
1. Calculate the means of the weather station dataset and the means of the main dataset.
2. Calculate all the parameters we need to do a t-test. 
3. Interpret our results.

Do you see a bit of repetition here? Steps 2-5 are a repeat of last time. The quick and dirty fix is to copy that code across to our notebook, but what about next time and the time after that? All of that code will also clutter this notebook and make our analysis harder to read.

You might also think of exporting the fixed and merged data from last time to a CSV, but what if there is new data in the database?

So, let's stop for a second and think about how we can make this simpler, more extendible and reusable in the future. 

# Data dictionary

**1. Geographic features**

- **Field_ID:** A unique identifier for each field (BigInt).
 
- **Elevation:** The elevation of the field above sea level in metres (Float).

- **Latitude:** Geographical latitude of the field in degrees (Float).

- **Longitude:** Geographical longitude of the field in degrees (Float).

- **Location:** Province the field is in (Text).

- **Slope:** The slope of the land in the field (Float).

**2. Weather features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Rainfall:** Amount of rainfall in the area in mm (Float).

- **Min_temperature_C:** Average minimum temperature recorded in Celsius (Float).

- **Max_temperature_C:** Average maximum temperature recorded in Celsius (Float).

- **Ave_temps:** Average temperature in Celcius (Float).

**3. Soil and crop features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Soil_fertility:** A measure of soil fertility where 0 is infertile soil, and 1 is very fertile soil (Float).

- **Soil_type:** Type of soil present in the field (Text).

- **pH:** pH level of the soil, which is a measure of how acidic/basic the soil is (Float).

**4. Farm management features**

- **Field_ID:** Corresponding field identifier (BigInt).

- **Pollution_level:** Level of pollution in the area where 0 is unpolluted and 1 is very polluted (Float).

- **Plot_size:** Size of the plot in the field (Ha) (Float).

- **Chosen_crop:** Type of crop chosen for cultivation (Text).

- **Annual_yield:** Annual yield from the field (Float). This is the total output of the field. The field size and type of crop will affect the Annual Yield

- **Standard_yield:** Standardised yield expected from the field, normalised per crop (Float). This is independent of field size, or crop type. Multiplying this number by the field size, and average crop yield will give the Annual_Yield.

<br>

**Weather_station_data (CSV)**

- **Weather_station_ID:** The weather station the data originated from. (Int)

- **Message:** The weather data was captured by sensors at the stations, in the format of text messages.(Str)

**Weather_data_field_mapping (CSV)**

- **Field_ID:** The id of the field that is connected to a weather station. This is the key we can use to join the weather station ID to the original data. (Int)

- **Weather_station_ID:** The weather station that is connected to a field. If a field has `weather_station_ID = 0` then that field is closest to weather station 0. (Int)

<br>

# Dealing with a friendly warning from Pandas

If you are running this notebook in `Python 3.12` or later, you might get a warning if you run the imports below:

In [10]:
import re
import numpy as np
import pandas as pd
import os

If you are lucky enough to see this warning, it let's us know that Pandas is changing soon and will require another package to be installed. 

```python
...2334042735.py:3: DeprecationWarning: 
Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd
```

We can safely ignore these warnings, but soon our script will fail to import Pandas, so let's fix it today, and we won't have to worry about it for a long time. The warning tells us that Pyarrow will soon be a requirement to import Pandas, so we can just install it with pip. 

In [11]:
%pip install Pyarrow

Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.



# Cleaning up our data pipeline

When we pulled in the data last time, we actually made an assumption that our script worked. We assumed that all the fixes we made to the data actually made it in, and assumed that the database didn't change. But what if someone added more records, fixed the data on the database that we were making in our notebook, or added a new column of data? 

Last time we started to build what's known as a **data pipeline**. We often do this as data scientists. The data we work with is almost always stored in databases, and we don't want to transform, clean up or make changes to the database unless it is really beneficial. So we retrieve the data from the database, and in our workspace, we shape the data into a useful format for our goal. 

Last time we imported data with this code: (⚠️ Don't run it)

In [12]:
import pandas as pd # importing the Pandas package with an alias, pd
from sqlalchemy import create_engine, text # Importing the SQL interface. If this fails, run !pip install sqlalchemy in another cell.
import matplotlib.pyplot as plt
import seaborn as sns


# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.

In [13]:
sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

In [14]:
MD_agric_df.rename(columns={'Annual_yield': 'Crop_type_Temp', 'Crop_type': 'Annual_yield'}, inplace=True)
MD_agric_df.rename(columns={'Crop_type_Temp': 'Crop_type'}, inplace=True)
MD_agric_df['Elevation'] = MD_agric_df['Elevation'].abs()

# Correcting 'Crop_type' column
def correct_crop_type(crop):
    crop = crop.strip()  # Remove trailing spaces
    corrections = {
        'cassaval': 'cassava',
        'wheatn': 'wheat',
        'teaa': 'tea'
    }
    return corrections.get(crop, crop)  # Get the corrected crop type, or return the original if not in corrections

# Apply the correction function to the Crop_type column
MD_agric_df['Crop_type'] = MD_agric_df['Crop_type'].apply(correct_crop_type)

In [15]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

In [16]:
import re # Importing the regex pattern
import numpy as np


patterns = {
    'Rainfall': r'(\d+(\.\d+)?)\s?mm',
     'Temperature': r'(\d+(\.\d+)?)\s?C',
    'Pollution_level': r'=\s*(-?\d+(\.\d+)?)|Pollution at \s*(-?\d+(\.\d+)?)'
    }

def extract_measurement(message):
    """
    Extracts a numeric measurement value from a given message string.

    The function applies regular expressions to identify and extract
    numeric values related to different types of measurements such as
    Rainfall, Average Temperatures, and Pollution Levels from a text message.
    It returns the key of the matching record, and first matching value as a floating-point number.
    
    Parameters:
    message (str): A string message containing the measurement information.

    Returns:
    float: The extracted numeric value of the measurement if a match is found;
           otherwise, None.

    The function uses the following patterns for extraction:
    - Rainfall: Matches numbers (including decimal) followed by 'mm', optionally spaced.
    - Ave_temps: Matches numbers (including decimal) followed by 'C', optionally spaced.
    - Pollution_level: Matches numbers (including decimal) following 'Pollution at' or '='.
    
    Example usage:
    extract_measurement("【2022-01-04 21:47:48】温度感应: 现在温度是 12.82C.")
    # Returns: 'Temperature', 12.82
    """
    
    for key, pattern in patterns.items(): # Loop through all of the patterns and check if it matches the pattern value.
        match = re.search(pattern, message)
        if match:
            # Extract the first group that matches, which should be the measurement value if all previous matches are empty.
            # print(match.groups()) # Uncomment this line to help you debug your regex patterns.
            return key, float(next((x for x in match.groups() if x is not None)))
    
    return None, None

# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

In [17]:
# The function creates a tuple with the measurement type and value into a Pandas Series
result = weather_station_df['Message'].apply(extract_measurement)

# Create separate columns for 'Measurement' and 'extracted_value' by unpacking the tuple with Lambda functions.
weather_station_df['Measurement'] = result.apply(lambda x: x[0])
weather_station_df['Value'] = result.apply(lambda x: x[1])

weather_station_means = weather_station_df.groupby(by = ['Weather_station_ID','Measurement'])['Value'].mean(numeric_only = True)
weather_station_means = weather_station_means.unstack()
weather_station_means

Measurement,Pollution_level,Rainfall,Temperature
Weather_station_ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,0.352791,1575.95375,13.4039
1,0.257333,577.38391,12.998899
2,0.043858,1690.955324,13.179717
3,0.23819,905.191397,13.256779
4,0.128261,1200.183505,13.188571


In [18]:
# Use this line of code to see which messages are not assigned yet.
weather_station_df[(weather_station_df['Measurement'] == None)|(weather_station_df['Value'].isna())]

Unnamed: 0,Weather_station_ID,Message,Measurement,Value


In [19]:

MD_agric_df = MD_agric_df.merge(weather_station_mapping_df,on = 'Field_ID', how='left')
MD_agric_df.drop(columns="Unnamed: 0")
MD_agric_df_weather_means = MD_agric_df.groupby("Weather_station").mean(numeric_only = True)[['Pollution_level','Rainfall', 'Ave_temps']]

MD_agric_df_weather_means = MD_agric_df_weather_means.rename(columns = {'Ave_temps':"Temperature"})
MD_agric_df_weather_means

Unnamed: 0_level_0,Pollution_level,Rainfall,Temperature
Weather_station,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,0.367706,1522.894473,13.393418
1,0.244627,568.445319,13.058085
2,0.04381,1705.83428,13.168364
3,0.271464,931.60786,13.224666
4,0.139913,1168.9918,13.197995


We have to copy that code across from another notebook, and we may miss some blocks or lines of code. We may not copy over the code in the right order, and we may not have documented the code well either. 

Do you also think this is a massive block of code that feels like it is in the way of our analysis? Is it simple to follow and simple to change? 

No! So let's spend a bit of time building a proper data pipeline that imports the data from the different sources we have, cleans up the data, and tests whether our data is what we expect it to be. We want to do all of this in a way that doesn't cause our notebook to be filled with thousands of lines of code and become a nightmare to debug. As a final step, we want to automate a few simple data validation checks in our code.

To do this we're going to re-organise or **refactor** our code into modules. We're going to reorganise all of the code into smaller modules so our code is more readable, maintainable and extendable. 

We can create a module to interact with the database, a module to transform and clean the field-related data and another module to process the weather data.

<br>

So here's the plan: 

1. Gather all of the code from our last "pipeline".

2. Re-organise the code into our new three modules: 

    a. `data_ingesation.py` - All SQL-related functions, and web-based data retrieval.

    b. `field_data_processor.py` - All transformations, cleanup, and merging functionality.

    c. `weather_data_processor.py` - All transformations and cleanup of the weather station data.

3. Copy our code into the modules and test their functionality.

4. Create automated data validation tests to ensure our data is as we expect it to be.

Once we're done with that, we're going to jump into the reason why we're here. So let's get started!

# Modules

Why are we doing this? Well, we want to reduce our data pipeline code to a couple of lines like this: 

In [20]:
field_processor.process()
field_df = field_processor.df

weather_processor.process()
field_df = field_processor.weather_df

NameError: name 'field_processor' is not defined

We're going to need a bit more code, but we want to press a button and go from databases and CSVs to a Pandas DataFrame. All of the code and processes are happening in the modules and we get the end result. The second big motivator is to make our code more modular. If we want to debug a problem in the field data, we know where to go, and if we want to import other IoT weather sensors we can just modify the weather data module.

The first challenge; automating the data ingestion. There are two places we're fetching data:
1. SQLite database - We need to create an SQLite engine, connect to the database, run a query and return a pandas DataFrame.
2. Web CSV file - Read the CSV data from the web, and import it as a DataFrame.

So let's start building!

## Data ingestion

Ok, so first up, let's grab the code that interacted with the database, and the web CSV files.

SQL:

In [None]:
import pandas as pd # importing the Pandas package with an alias, pd
from sqlalchemy import create_engine, text # Importing the SQL interface. If this fails, run !pip install sqlalchemy in another cell.
import matplotlib.pyplot as plt
import seaborn as sns


# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.


sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

CSV files: 

In [None]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

Creating modules in Jupyter notebooks is a bit of a pain. If we make changes to the `module.py` file, we have to restart the notebook kernel and import the module again in order to apply those changes. So, we're going to fully develop it, test it in this notebook, and only then, move it to a `data_ingestion.py` file, and import it. 

To create a module, our code should ideally be encapsulated in functions or classes. How do we choose?

Since the process of connecting to a database, and querying some data is relatively straightforward, functions seemed like the best fit. Functions allow us to encapsulate the necessary steps in clear, reusable blocks of code without the overhead of managing class objects. It's like using a **simple tool** for a specific task — pick it up, use it, and put it back without needing to remember anything about the last use.

On the other hand, our **data processing** modules are a bit more complex. These modules not only perform various operations on the data but also need to keep track of the data as it goes through these processes. For this, we use **classes** to **create DataFrame objects** as attributes. This approach simplifies data handling since we're passing the object around, which inherently knows its data and the operations it can perform within the class.

This idea ties back to OOP. Class objects are designed to deal with data and operations on that data via methods, while functions are made to do the simpler tasks.

So for **data ingestion**, we're just going to use functions.

So our **main task** will be to **convert the data ingestion code into functions** that we can call from the module.

So this code: 

In [None]:
# Create an engine for the database
engine = create_engine('sqlite:///Maji_Ndogo_farm_survey_small.db') #Make sure to have the .db file in the same directory as this notebook, and the file name matches.


sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

# Create a connection object
with engine.connect() as connection:
    
    # Use Pandas to execute the query and store the result in a DataFrame
    MD_agric_df = pd.read_sql_query(text(sql_query), connection)

Becomes neat, modular functions: 

In [21]:
def create_db_engine(db_path):
    engine = create_engine(db_path)
    return engine

def query_data(engine, sql_query):
    with engine.connect() as connection:
        df = pd.read_sql_query(text(sql_query), connection)
        return df

So if we call:

In [22]:
create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

Engine(sqlite:///Maji_Ndogo_farm_survey_small.db)

We get the SQL engine object which we can use with the query to connect to the database, and run a query.

In [23]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""


df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield
0,40734,786.05580,-7.389911,-7.556202,Rural_Akatsi,14.795113,1125.2,-3.1,33.1,15.00,0.62,Sandy,6.169393,8.526684e-02,1.3,0.751354,cassava,0.577964
1,30629,674.33410,-7.736849,-1.051539,Rural_Sokoto,11.374611,1450.7,-3.9,30.6,13.35,0.64,Volcanic,5.676648,3.996838e-01,2.2,1.069865,cassava,0.486302
2,39924,826.53390,-9.926616,0.115156,Rural_Sokoto,11.339692,2208.9,-1.8,28.4,13.30,0.69,Volcanic,5.331993,3.580286e-01,3.4,2.208801,tea,0.649647
3,5754,574.94617,-2.420131,-6.592215,Rural_Kilimani,7.109855,328.8,-5.8,32.2,13.20,0.54,Loamy,5.328150,2.866871e-01,2.4,1.277635,cassava,0.532348
4,14146,886.35300,-3.055434,-7.952609,Rural_Kilimani,55.007656,785.2,-2.5,31.0,14.25,0.72,Sandy,5.721234,4.319027e-02,1.5,0.832614,wheat,0.555076
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5649,11472,681.36145,-7.358371,-6.254369,Rural_Akatsi,16.213196,885.7,-4.3,33.4,14.55,0.61,Sandy,5.741063,3.286828e-01,1.1,0.609930,potato,0.554482
5650,19660,667.02120,-3.154559,-4.475046,Rural_Kilimani,2.397553,501.1,-4.8,32.1,13.65,0.54,Sandy,5.445833,1.602583e-01,8.7,3.812289,maize,0.438194
5651,41296,670.77900,-14.472861,-6.110221,Rural_Hawassa,7.636470,1586.6,-3.8,33.4,14.80,0.64,Volcanic,5.385873,8.221326e-09,2.1,1.681629,tea,0.800776
5652,33090,429.48840,-14.653089,-6.984116,Rural_Hawassa,13.944720,1272.2,-6.2,34.6,14.20,0.63,Silt,5.562508,6.917245e-10,1.3,0.659874,cassava,0.507595


We can even call the `create_db_engine()` function inside the `query_data()` function:

In [24]:
sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""


df = query_data(create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db'), sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield
0,40734,786.05580,-7.389911,-7.556202,Rural_Akatsi,14.795113,1125.2,-3.1,33.1,15.00,0.62,Sandy,6.169393,8.526684e-02,1.3,0.751354,cassava,0.577964
1,30629,674.33410,-7.736849,-1.051539,Rural_Sokoto,11.374611,1450.7,-3.9,30.6,13.35,0.64,Volcanic,5.676648,3.996838e-01,2.2,1.069865,cassava,0.486302
2,39924,826.53390,-9.926616,0.115156,Rural_Sokoto,11.339692,2208.9,-1.8,28.4,13.30,0.69,Volcanic,5.331993,3.580286e-01,3.4,2.208801,tea,0.649647
3,5754,574.94617,-2.420131,-6.592215,Rural_Kilimani,7.109855,328.8,-5.8,32.2,13.20,0.54,Loamy,5.328150,2.866871e-01,2.4,1.277635,cassava,0.532348
4,14146,886.35300,-3.055434,-7.952609,Rural_Kilimani,55.007656,785.2,-2.5,31.0,14.25,0.72,Sandy,5.721234,4.319027e-02,1.5,0.832614,wheat,0.555076
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5649,11472,681.36145,-7.358371,-6.254369,Rural_Akatsi,16.213196,885.7,-4.3,33.4,14.55,0.61,Sandy,5.741063,3.286828e-01,1.1,0.609930,potato,0.554482
5650,19660,667.02120,-3.154559,-4.475046,Rural_Kilimani,2.397553,501.1,-4.8,32.1,13.65,0.54,Sandy,5.445833,1.602583e-01,8.7,3.812289,maize,0.438194
5651,41296,670.77900,-14.472861,-6.110221,Rural_Hawassa,7.636470,1586.6,-3.8,33.4,14.80,0.64,Volcanic,5.385873,8.221326e-09,2.1,1.681629,tea,0.800776
5652,33090,429.48840,-14.653089,-6.984116,Rural_Hawassa,13.944720,1272.2,-6.2,34.6,14.20,0.63,Silt,5.562508,6.917245e-10,1.3,0.659874,cassava,0.507595


This seems simple, but let's think for a second about what could go wrong. For example, what if there is a problem like the database has a schema, and no actual data? Or our query doesn't return any data.

In [25]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

Unnamed: 0,Field_ID,Elevation,Latitude,Longitude,Location,Slope,Rainfall,Min_temperature_C,Max_temperature_C,Ave_temps,Soil_fertility,Soil_type,pH,Pollution_level,Plot_size,Crop_type,Annual_yield,Standard_yield


We get an empty DataFrame, because SQL returned an empty query result. When we try to filter results, we get an answer that would not make sense.

In [26]:
df['Rainfall'] > 100

Series([], Name: Rainfall, dtype: bool)

So to avoid this we need to add error handling into our code so that we stop the process if something is wrong, and tell us what the problem is before we continue. 

Secondly, to help us understand how our code is executing we're going to add some logs. While print statements can help us to debug our code, we have to remove them once our code goes into use, one by one. `logging` is a better way to debug our code than print statements because we can add `logging.INFO()` logs to know what our code is doing, and `logging.DEBUG()` statements that have more detail in case we want to debug a specific loop in a bit more in detail. There are also various other tools to use, and we can also silence all logging with a single line of code. If we used print statements, we will have to comment them out one by one. 

If we apply these two ideas, we get the code below:

In [27]:
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')

# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e

Now when we run the incorrect query again:

In [28]:
SQL_engine = create_db_engine('sqlite:///Maji_Ndogo_farm_survey_small.db')

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
WHERE Rainfall < 0 
"""
# The last line won't ever be true, so no results will be returned. 

df = query_data(SQL_engine, sql_query)
df

2024-05-26 00:24:15,230 - data_ingestion - INFO - Database engine created successfully.
2024-05-26 00:24:15,237 - data_ingestion - ERROR - The query returned an empty DataFrame.
2024-05-26 00:24:15,238 - data_ingestion - ERROR - SQL query failed. Error: The query returned an empty DataFrame.


ValueError: The query returned an empty DataFrame.

We get a log of what happened, and we now get an error telling us there is something wrong with the DataFrame, and we are prevented from processing it further.

Next up, let's include the CSV data handling. This is the original code: 

In [29]:
weather_station_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv")
weather_station_mapping_df = pd.read_csv("https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv")

These two files are imported in the same way, so we can use one function to do it.

In [30]:
weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"


def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e
    
    
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_data = read_from_web_CSV(weather_mapping_data_URL)

2024-05-26 00:24:18,136 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-26 00:24:18,901 - data_ingestion - INFO - CSV file read successfully from the web.


Great! Now our code can connect to a database for the field data, use a query to retrieve data and create a DataFrame. We can also import CSV files from a URL into a DataFrame, and avoid pulling unexpected data. If we put all this code together we have the basic structure of our module. 

In [31]:

from sqlalchemy import create_engine, text
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')
# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

def create_db_engine(db_path):
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e
    
def read_from_web_CSV(URL):
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e


In [32]:
# Testing module functions  
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

2024-05-26 00:24:19,670 - data_ingestion - INFO - Database engine created successfully.
2024-05-26 00:24:19,778 - data_ingestion - INFO - Query executed successfully.
2024-05-26 00:24:20,412 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-26 00:24:21,109 - data_ingestion - INFO - CSV file read successfully from the web.


Once we run this we should get a log telling us that it all worked. 

Note that there are imports at the top of the cell. When we move this to a module, it needs to import packages like SQL Alchemy and Pandas. 

Let's do a test to make sure these functions are working well:

In [33]:
field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)


**Expected outcome:** 

`field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)`

Before we go and move this into the `data_ingestion.py` file, we are missing one **crucial** aspect. 

**Documentation!** We need to add a module docstring and function docstrings. 

<br>

⚙️ **Task:** Create a **module docstring** and **function docstrings** for each function. Refer to your notes and PEP 8 guidelines (also see `PEP 257`) to guide you on what these docstrings should contain. Edit the code below to make these changes. 

>⚠️ Do not change the logic of this code, only add documentation.

In [34]:
"""Module: data_ingestion

This module provides functions to ingest data from various sources.

"""
from sqlalchemy import create_engine, text
import logging
import pandas as pd
# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')
# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
db_path = 'sqlite:///Maji_Ndogo_farm_survey_small.db'

sql_query = """
SELECT *
FROM geographic_features
LEFT JOIN weather_features USING (Field_ID)
LEFT JOIN soil_and_crop_features USING (Field_ID)
LEFT JOIN farm_management_features USING (Field_ID)
"""

weather_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_station_data.csv"
weather_mapping_data_URL = "https://raw.githubusercontent.com/Explore-AI/Public-Data/master/Maji_Ndogo/Weather_data_field_mapping.csv"

In [35]:
### START FUNCTION
"""Module: data_ingestion

This module provides functions to ingest data from various sources.

"""
from sqlalchemy import create_engine, text
import logging
import pandas as pd

# Name our logger so we know that logs from this module come from the data_ingestion module
logger = logging.getLogger('data_ingestion')

# Set a basic logging message up that prints out a timestamp, the name of our logger, and the message
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

def create_db_engine(db_path):
    """Create a SQLAlchemy database engine.

    Args:
        db_path (str): The path to the database.

    Returns:
        sqlalchemy.engine.base.Engine: The SQLAlchemy engine object.
    """
    try:
        engine = create_engine(db_path)
        # Test connection
        with engine.connect() as conn:
            pass
        # test if the database engine was created successfully
        logger.info("Database engine created successfully.")
        return engine # Return the engine object if it all works well
    except ImportError: #If we get an ImportError, inform the user SQLAlchemy is not installed
        logger.error("SQLAlchemy is required to use this function. Please install it first.")
        raise e
    except Exception as e:# If we fail to create an engine inform the user
        logger.error(f"Failed to create database engine. Error: {e}")
        raise e
    
def query_data(engine, sql_query):
    """Execute a SQL query on a database engine and return the result as a DataFrame.

    Args:
        engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine object.
        sql_query (str): The SQL query to execute.

    Returns:
        pandas.DataFrame: The result of the SQL query as a DataFrame.
    """
    try:
        with engine.connect() as connection:
            df = pd.read_sql_query(text(sql_query), connection)
        if df.empty:
            # Log a message or handle the empty DataFrame scenario as needed
            msg = "The query returned an empty DataFrame."
            logger.error(msg)
            raise ValueError(msg)
        logger.info("Query executed successfully.")
        return df
    except ValueError as e: 
        logger.error(f"SQL query failed. Error: {e}")
        raise e
    except Exception as e:
        logger.error(f"An error occurred while querying the database. Error: {e}")
        raise e
    
def read_from_web_CSV(URL):
    """Read a CSV file from a web URL and return it as a DataFrame.

    Args:
        URL (str): The URL of the CSV file.

    Returns:
        pandas.DataFrame: The CSV file contents as a DataFrame.
    """
    try:
        df = pd.read_csv(URL)
        logger.info("CSV file read successfully from the web.")
        return df
    except pd.errors.EmptyDataError as e:
        logger.error("The URL does not point to a valid CSV file. Please check the URL and try again.")
        raise e
    except Exception as e:
        logger.error(f"Failed to read CSV from the web. Error: {e}")
        raise e
    
### END FUNCTION

Let's test our code to make sure it works before we move on.

In [36]:
# Testing module functions  
field_df = query_data(create_db_engine(db_path), sql_query)   
weather_df = read_from_web_CSV(weather_data_URL)
weather_mapping_df = read_from_web_CSV(weather_mapping_data_URL)

field_test = field_df.shape
weather_test = weather_df.shape
weather_mapping_test = weather_mapping_df.shape
print(f"field_df: {field_test}, weather_df: {weather_test}, weather_mapping_df: {weather_mapping_test}")

2024-05-26 00:24:32,352 - data_ingestion - INFO - Database engine created successfully.
2024-05-26 00:24:32,452 - data_ingestion - INFO - Query executed successfully.
2024-05-26 00:24:33,166 - data_ingestion - INFO - CSV file read successfully from the web.
2024-05-26 00:24:33,879 - data_ingestion - INFO - CSV file read successfully from the web.


field_df: (5654, 18), weather_df: (1843, 2), weather_mapping_df: (5654, 3)


#  

<div align="center" style=" font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/ExploreAI_logos/EAI_Blue_Dark.png"  style="width:200px";/>
</div>