### Simplified Notes on Extracting Data from Structured Sources

#### 1. **Introduction to Data Extraction**
- The first step in any data pipeline is extracting data from a **source system**.
- This can include files, databases, APIs, or even web scraping.

---

#### 2. **Types of Source Systems**
- **Static File Types**: CSV, Parquet, JSON
- **Dynamic Sources**: SQL databases
- **External Sources**: APIs (for third-party data), Web scraping
- **Enterprise Systems**: Data lakes and data warehouses

---

#### 3. **Reading Parquet Files**
- **Parquet** is a fast, columnar storage file format — more efficient than CSV.
- Use `pd.read_parquet("file_path", engine="fastparquet")` to read a Parquet file into a DataFrame.
- Similar to reading CSV files, but faster for large datasets.

---

#### 4. **Extracting from SQL Databases**
- Use `pandas.read_sql(query, connection)` to load data from SQL into a DataFrame.
- First, create a **connection object** using SQLAlchemy:
  ```python
  from sqlalchemy import create_engine
  engine = create_engine("postgresql+psycopg2://user:password@host:port/database")
  ```
- Use any SQL `SELECT` query to pull data.
- This allows flexible querying and efficient data extraction.

---

#### 5. **Modular Code Design**
- Separate the pipeline into three main parts:
  - **Extract**
  - **Transform**
  - **Load**
- Define each step in its own function to make the code:
  - Reusable
  - Easier to read and maintain
  - Aligned with the **DRY principle** (Don't Repeat Yourself)

Example:
```python
def extract_from_parquet(path):
    return pd.read_parquet(path)

def extract_from_sql(query, conn):
    return pd.read_sql(query, conn)
```

---

#### 6. **Time to Practice**
- You’ll get hands-on experience with extracting data from sources like Parquet files and SQL databases.

---

These notes summarize how to extract data efficiently using Python and pandas, and how to structure your code for scalable pipeline development.

### **Extracting data from parquet files**
One of the most common ways to ingest data from a source system is by reading data from a file, such as a CSV file. As data has gotten bigger, the need for better file formats has brought about new column-oriented file types, such as parquet files.

In this exercise, you'll practice extracting data from a parquet file.



In [None]:
import pandas as pd

# Read the sales data into a DataFrame
sales_data = pd.read_parquet("sales_data.parquet", engine="fastparquet")

# Check the data type of the columns of the DataFrames
print(sales_data.dtypes)

# Print the shape of the DataFrame, as well as the head
print(sales_data.shape)
print(sales_data.head())


### **Pulling data from SQL databases**
SQL databases are one of the most used data storage tools in the world. Many companies have teams of several individuals responsible for creating and maintaining these databases, which typically store data crucial for day-to-day operations. These SQL databases are commonly used as source systems for a wide range of data pipelines.

For this exercise, pandas has been imported as pd. Best of luck!

In [None]:
import sqlalchemy

# Create a connection to the sales database
db_engine = sqlalchemy.create_engine("postgresql+psycopg2://repl:password@localhost:5432/sales")

# Query the sales table
raw_sales_data = pd.read_sql("select * from sales", db_engine)
print(raw_sales_data)


### **Building functions to extract data**
It's important to modularize code when building a data pipeline. This helps to make pipelines more readable and reusable, and can help to expedite troubleshooting efforts. Creating and using functions for distinct operations in a pipeline can even help when getting started on a new project by providing a framework to begin development.

pandas has been imported as pd, and sqlalchemy is ready to be used.

In [None]:
def extract():
    connection_uri = "postgresql+psycopg2://repl:password@localhost:5432/sales"
    db_engine = sqlalchemy.create_engine(connection_uri)
    raw_data = pd.read_sql("SELECT * FROM sales WHERE quantity_ordered = 1", db_engine)
    
    # Print the head of the DataFrame
    print(raw_data.head())
    
    # Return the extracted DataFrame
    return raw_data
    
# Call the extract() function
raw_sales_data = extract()


### Simplified Notes on Transforming Data with Pandas

#### 1. **Importance of Data Transformation**
- Transforming data is crucial in a data pipeline to ensure that it’s in the correct format and useful for downstream analysis.
- Using **pandas**, data transformation becomes simple, allowing you to filter rows, create new columns, change data types, and more.

---

#### 2. **Filtering Records with `.loc[]`**
- **`loc[]`** is a powerful pandas function to filter rows and columns based on conditions.
  - **Example 1**: Filter rows where the `"open"` value is greater than zero:
    ```python
    df.loc[df['open'] > 0, :]
    ```
  - **Example 2**: Keep only specific columns, like `"timestamps"`, `"open"`, and `"close"`:
    ```python
    df.loc[:, ['timestamps', 'open', 'close']]
    ```
  - **Combining** conditions:
    ```python
    df.loc[(df['open'] > 0), ['timestamps', 'open', 'close']]
    ```
- **`iloc[]`** is similar but uses integer-based indexing for rows and columns.

---

#### 3. **Altering Data Types**
- **`to_datetime()`**: Convert columns to datetime format.
  - Example: Convert a `"timestamps"` column (string format) into datetime:
    ```python
    df['timestamps'] = pd.to_datetime(df['timestamps'], format='%Y%m%d%H%M%S')
    ```
  - Convert a Unix timestamp (milliseconds since 1970) to datetime:
    ```python
    df['timestamps'] = pd.to_datetime(df['timestamps'], unit='ms')
    ```
- This ensures timestamps are usable for time-based analysis.

---

#### 4. **Validating Transformations**
- **`head()`**: Check the first few rows of the DataFrame to spot-check transformations.
  - Example:
    ```python
    df.head()
    ```
- **`nsmallest()`** & **`nlargest()`**: Check for the smallest or largest values in columns, useful when filtering data.
  - Example:
    ```python
    df['column'].nsmallest(5)
    ```

---

#### 5. **Practice**
- Now that you know how to filter, transform, and validate data, it’s time to apply these skills in practice exercises!

---

These notes summarize the essential transformation steps in a data pipeline using pandas, making it easy to filter, modify, and validate your data for analysis.

### **Filtering pandas DataFrames**
Once data has been extracted from a source system, it's time to transform it! Often, source data may have more information than what is needed for downstream use cases. If this is the case, dimensionality should be reduced during the "transform" phase of the data pipeline.

pandas has been imported as pd, and the extract() function is available to load a DataFrame from the path that is passed.

In [None]:
# Extract data from the sales_data.parquet path
raw_sales_data = extract("sales_data.parquet")

def transform(raw_data):
  	# Only keep rows with `Quantity Ordered` greater than 1
    clean_data = raw_data.loc[raw_data["Quantity Ordered"] > 1, :]


    
    # Only keep columns "Order Date", "Quantity Ordered", and "Purchase Address"
    clean_data = clean_data.loc[:, ["Order Date", "Quantity Ordered", "Purchase Address"]]
    
    # Return the filtered DataFrame
    return clean_data
    
transform(raw_sales_data)


### **Transforming sales data with pandas**
Before insights can be extracted from a dataset, column types may need to be altered to properly leverage the data. This is especially common with temporal data types, which can be stored in several different ways.

For this example, pandas has been import as pd and is ready for you to use.

In [None]:
raw_sales_data = extract("sales_data.csv")

def transform(raw_data):
    # Convert the "Order Date" column to type datetime
    raw_data["Order Date"] = pd.to_datetime(raw_data["Order Date"], format="%m/%d/%y %H:%M")
    
    # Only keep items under ten dollars
    clean_data = raw_data.loc[raw_data['Price Each'] <10, :]
    return clean_data

clean_sales_data = transform(raw_sales_data)

# Check the data types of each column
print(clean_sales_data.dtypes)


### **Validating data transformations**
Great work so far! Manually spot-checking transformations is a great first step to ensuring that you're maintaining data quality throughout a pipeline. pandas offers several built-in functions to help you with just that!

To help get you started with this exercise, pandas has been imported as pd.

In [None]:
def extract(file_path):
    raw_data = pd.read_parquet(file_path)
    return raw_data

raw_sales_data = extract("sales_data.parquet")

def transform(raw_data):
  	# Filter rows and columns
    clean_data = raw_data.loc[raw_data["Quantity Ordered"] == 1, ["Order ID", "Price Each", "Quantity Ordered"]]
    return clean_data

# Transform the raw_sales_data
clean_sales_data = transform(raw_sales_data)



---

## 🐼 Persisting Data with Pandas — Detailed Notes

### 1. **Overview**
- After **extracting** and **transforming** data, the next step is to **load** it — typically into files or databases.
- Pandas provides built-in methods to persist data for later use by data scientists, analysts, or downstream processes.

---

### 2. **Importance of Persisting Data in ETL Pipelines**
- **Persistence** means saving data at different stages (not just at the final "Load" step).
- It ensures that data snapshots are available:
  - For **recovery** in case of failure.
  - If data is **hard to reacquire** from the original source.
- Saves time by avoiding repeated extraction and transformation steps.

---

### 3. **Writing DataFrames to CSV with Pandas**
- Use `DataFrame.to_csv()` to write a DataFrame to a `.csv` file.
- Syntax:
  ```python
  df.to_csv("path/to/filename.csv")
  ```
- Example:
  ```python
  import pandas as pd
  df = pd.DataFrame(data)
  df.to_csv("stock_data.csv")
  ```
- By default, saves:
  - With header
  - With index
  - Using a comma separator

---

### 4. **Customizing CSV Output**
#### a) `header` Argument:
- Values: `True`, `False`, or list of column names (aliases).
- Default: `True` (includes column headers).
- Set to `False` to skip headers.

#### b) `index` Argument:
- Values: `True` (default), `False`.
- When `True`, index column is saved.
- Set to `False` if the index isn’t meaningful (recommended for clean output).

#### c) `sep` Argument:
- Default: `,` (comma).
- Alternative: `|` (pipe), `\t` (tab), etc.
- Useful for formatting or working with special systems.

#### Example:
```python
df.to_csv("file.csv", index=False, header=True, sep="|")
```

---

### 5. **Other File Formats in Pandas**
Besides `to_csv`, Pandas offers:

| Method         | Description                      |
|----------------|----------------------------------|
| `to_json()`    | Save DataFrame as JSON file      |
| `to_excel()`   | Save as Excel `.xlsx`            |
| `to_sql()`     | Save to a SQL database (needs connector) |
| `to_parquet()` | Save in efficient binary format  |

> These allow saving data in formats tailored for different platforms or performance needs.

---

### 6. **Validating File Persistence with `os`**
After writing to a file, **validate success** by checking if the file exists.

```python
import os

file_path = "stock_data.csv"
if os.path.exists(file_path):
    print("File saved successfully.")
else:
    print("File not found. Something went wrong.")
```

This check is useful for:
- Debugging ETL pipelines.
- Ensuring reliability in automated workflows.

---

### ✅ Summary
- Use `to_csv()` for quick, customizable CSV exports.
- Persist data at **multiple ETL stages** — not just the end.
- Validate file creation using `os.path.exists()`.
- Explore alternatives like `to_json()`, `to_sql()`, `to_parquet()` for specific needs.

---



### **Loading sales data to a CSV file**
Loading data is an essential component of any data pipeline. It ensures that any data consumers and processes have reliable access to data that you've extracted and transformed earlier in a pipeline. In this exercise, you'll practice loading transformed sales data to a CSV file using pandas, which has been imported as pd. In addition to this, the raw data has been extracted and is available in the DataFrame raw_sales_data.

In [None]:
def transform(raw_data):
	# Find the items prices less than 25 dollars
	return raw_data.loc[raw_data["Price Each"] < 25 , ["Order ID", "Product", "Price Each", "Order Date"]]

def load(clean_data):
	# Write the data to a CSV file without the index column
	clean_data.to_csv("transformed_sales_data.csv", index=False)


clean_sales_data = transform(raw_sales_data)

# Call the load function on the cleaned DataFrame
load(clean_sales_data)


### **Customizing a CSV file**
Sometimes, data needs to be stored in a CSV file in a customized manner. This may include using different header values, including or excluding the index column of a DataFrame, or altering the character used to separate columns. In this example, you'll get to practice this, as well as ensuring that the file is stored in the desired file path.

The pandas library has been imported as pd, and the data has already been transformed to include only rows with a "Quantity Ordered" greater than one. The cleaned DataFrame is stored in a variable named clean_sales_data.

In [None]:
# Import the os library
import os

# Load the data to a csv file with the index, no header and pipe separated
def load(clean_data, path_to_write):
	clean_data.to_csv(path_to_write, header=False, sep="|")

load(clean_sales_data, "clean_sales_data.csv")

# Check that the file is present.
file_exists = os.path.exists("clean_sales_data.csv")
print(file_exists)


### **Persisting data to files**
Loading data to a final destination is one of the most important steps of a data pipeline. In this exercise, you'll use the transform() function shown below to transform product sales data before loading it to a .csv file. This will give downstream data consumers a better view into total sales across a range of products.

For this exercise, the sales data has been loaded and transformed, and is stored in the clean_sales_data DataFrame. The pandas package has been imported as pd, and the os library is also ready to use!



In [None]:
def load(clean_data, file_path):
    # Write the data to a file
    clean_data.to_csv(file_path, header= False, index=False)

    # Check to make sure the file exists
    file_exists = os.path.exists(file_path)
    if not file_exists:
        raise Exception(f"File does NOT exists at path {file_path}")

# Load the transformed data to the provided file path
load(clean_sales_data, "transformed_sales_data.csv")


Here are detailed and structured notes for the video on **Monitoring a Data Pipeline**, optimized for clarity, quick review, and practical implementation:

---

## 📊 Monitoring a Data Pipeline — Detailed Notes

### 1. **Why Monitor Data Pipelines?**
- Once a pipeline is built, **monitoring** ensures:
  - Detection of **failures** during execution.
  - Awareness of **data quality or schema changes**.
  - Proactive **alerts** to engineers before end-users encounter issues.

---

### 2. **Common Failures in Pipelines**
- Missing data from source systems.
- Schema or data type changes (e.g., string to int).
- Deprecation or updates in tools used in the pipeline.

**Goal:** Make the pipeline **transparent** and **self-alerting** to reduce manual oversight.

---

### 3. **Using Logs to Monitor Performance**
- **Logs** = messages recorded during execution.
- Provide a **timeline** of what happened and when.
- Help **debug failures** by showing the exact execution path.

#### 🔧 Python’s `logging` module
- Offers six levels of logging:
  - `DEBUG`: Used during development to inspect internal details (e.g., shapes, values).
  - `INFO`: Reports general progress (e.g., "Loaded data successfully").
  - `WARNING`: Something unexpected happened, but code continues (e.g., unusual row count).
  - `ERROR`: Something failed that **halts** execution (e.g., missing file or column).

#### Example:
```python
import logging
logging.basicConfig(level=logging.INFO)

logging.debug("Data dimensions: 100 rows, 5 columns")
logging.info("Data loaded successfully")
logging.warning("Unexpected column found")
logging.error("File not found")
```

---

### 4. **Capturing Warnings and Errors**
- **Warning** logs: Notify about issues **not fatal** to execution.
  - Example: Unexpected row count.
- **Error** logs: Execution **must stop**.
  - Example: Missing or corrupted data.

Logging these makes it easier to:
- **Identify root causes**
- **Reduce troubleshooting time**
- Improve pipeline **reliability**

---

### 5. **Handling Exceptions with Try-Except**
Use `try-except` to catch and handle errors **without crashing the pipeline**.

#### Basic structure:
```python
try:
    # risky code
except Exception as e:
    logging.error(f"An error occurred: {e}")
```

#### Benefits:
- Prevents pipeline from halting unexpectedly.
- Ensures logs capture **why** the error happened.

---

### 6. **Handling Specific Exceptions**
Use **specific exception types** (like `KeyError`, `FileNotFoundError`) for better control.

#### Example Use Case:
```python
try:
    df = df[df["price_change"] > 0]
except KeyError as e:
    logging.error(f"Missing column: {e}")
    df["price_change"] = 0
    df = df[df["price_change"] > 0]
```

- If the column `price_change` doesn't exist, log the error.
- Create the column with a default value and re-run the filter logic.

✅ This ensures pipeline continues **gracefully** and **logs the fix**.

---

### 7. **Summary**
| Concept | Purpose |
|--------|---------|
| Logging | Monitor pipeline status & performance |
| Log Levels | Differentiate info, debug, warnings & errors |
| Try-Except | Handle errors without crashing |
| Specific Exceptions | Catch known issues with context |
| Transparency | Make pipeline issues visible before users notice |

---

### ✅ Best Practices
- Always use **logging** in production pipelines.
- Wrap risky operations in `try-except` blocks.
- Log **both success and failure** events.
- Start simple, then integrate tools like **Airflow**, **Prometheus**, or **Grafana** for advanced monitoring.

---

Would you like a **code template**, **Jupyter notebook**, or **alerting system suggestion** for production pipelines?

### **Logging within a data pipeline**
In this exercise, we'll take a look back at the function you wrote in a previous video and practice adding logging to the function. This will help when troubleshooting errors or making changes to the logic!

pandas has been imported as pd. In addition to this, the logging module has been imported, and the default log-level has been set to "debug".

In [None]:
def transform(raw_data):
    raw_data["Order Date"] = pd.to_datetime(raw_data["Order Date"], format="%m/%d/%y %H:%M")
    clean_data = raw_data.loc[raw_data["Price Each"] < 10, :]
    
    # Create an info log regarding transformation
    logging.info("Transformed 'Order Date' column to type 'datetime'.")
    
    # Create debug-level logs for the DataFrame before and after filtering
    logging.debug(f"Shape of the DataFrame before filtering: {raw_data.shape}")
    logging.debug(f"Shape of the DataFrame after filtering: {clean_data.shape}")
    
    return clean_data
  
clean_sales_data = transform(raw_sales_data)


### **Handling exceptions when loading data**
Sometimes, your data pipelines might throw an exception. These exceptions are a form of alerting, and they let a Data Engineer know when something unexpected happened. It's important to properly handle these exceptions. In this exercise, we'll practice just that!

To help get you started, pandas has been imported as pd, along with the logging module has been imported. The default log-level has been set to "debug".

In [None]:
def extract(file_path):
    return pd.read_parquet(file_path)

# Update the pipeline to include a try block
try:
	# Attempt to read in the file
    raw_sales_data = extract("sales_data.parquet")
	
# Catch the FileNotFoundError
except FileNotFoundError as file_not_found:
	# Write an error-level log
	logging.error(file_not_found)


### **Monitoring and alerting within a data pipeline**
It's time to put it all together! You might have guessed it, but using handling errors using try-except and logging go hand-in-hand. These two practices are essential for a pipeline to be resilient and transparent, and are the building blocks for more advanced monitoring and alerting solutions.

pandas has been imported as pd, and the logging module has been loaded and configured for you. The raw_sales_data DataFrame has been extracted, and is ready to be transformed.

In [None]:
# Create an info-level logging message to document success, and a warning-level logging message if the transformation fails.
def transform(raw_data):
	return raw_data.loc[raw_data["Total Price"] > 1000, :]

try:
	# Attempt to transform DataFrame, log an info-level message
	clean_sales_data = transform(raw_sales_data)
	logging.info("Successfully filtered DataFrame by 'Total Price'")
	
except Exception:
	# Log a warning-level message
	logging.warning("Cannot filter DataFrame by 'Total Price'")
 
 
 
#  Update the try-except clause to catch a KeyError, and alias as ke.
# Change the warning-level log to include the error being thrown.


def transform(raw_data):
	return raw_data.loc[raw_data["Total Price"] > 1000, :]

try:
	clean_sales_data = transform(raw_sales_data)
	logging.info("Successfully filtered DataFrame by 'Total Price'")
	
# Update the exception to be a KeyError, alias as ke
except KeyError as ke:
	# Log a warning-level message
	logging.warning(f"{ke}: Cannot filter DataFrame by 'Total Price'")


# If a key error is thrown, create a column "Total Price" by multiplying the "Price Each" and "Quantity Ordered" columns.
def transform(raw_data):
	return raw_data.loc[raw_data["Total Price"] > 1000, :]

try:
	clean_sales_data = transform(raw_sales_data)
	logging.info("Successfully filtered DataFrame by 'Total Price'")

except KeyError as ke:
	logging.warning(f"{ke}: Cannot filter DataFrame by 'Total Price'")
	
	# Create the "Total Price" column, transform the updated DataFrame
	raw_sales_data["Total Price"] = raw_sales_data["Price Each"] * raw_sales_data["Quantity Ordered"]
	clean_sales_data = transform(raw_sales_data)

