# **Building an ETL Pipeline**

- Ready to ratchet up the fun? In this exercise, you'll be responsible for building the rest of the load() function before running each step in the ETL process. The extract() and transform() functions have been defined for you.

**Instructions**

- Complete the load() function by writing the transformed_data DataFrame to a .csv file, using file_name.
- Use the transform() function to clean the extracted_data DataFrame.
- Load transformed_data to the transformed_data.csv file using the load() function.

In [None]:
def load(data_frame, file_name):
  # Write cleaned_data to a CSV using file_name
  data_frame.to_csv(file_name)
  print(f"Successfully loaded data to {file_name}")

extracted_data = extract(file_name="raw_data.csv")

# Transform extracted_data using transform() function
transformed_data = transform(data_frame=extracted_data)

# Load transformed_data to the file transformed_data.csv
load(data_frame=transformed_data, file_name="transformed_data.csv")


In [None]:
Extracting data from a source system
Transformed the raw data returned from extract()
Successfully loaded data to transformed_data.csv

**Explanation:**

- This code snippet demonstrates a simple data processing pipeline. 
- It first extracts data from a CSV file (raw_data.csv), transforms it using a transform function (not shown), and then saves the transformed data to another CSV file (transformed_data.csv). 
- The load function handles the saving process, printing a success message. 
- Note that there's a bug in the load function; it always saves to cleaned_data.csv regardless of the file_name argument. 
- It should be corrected to use file_name instead of a hardcoded filename.

# **The "T" in ELT**

- Let's not forget about ELT! Here, the extract() and load() functions have been defined for you. Now, all that's left is to finish defining the transform() function and run the pipeline. Go get 'em!

**Instructions**

- Update the transform() function to call the .execute() method on the data_warehouse object.
- Use the newly-updated transform() function to populate data in the total_sales target table by transforming data in the raw_sales_data source table.

In [None]:
# Complete building the transform() function
def transform(source_table, target_table):
  data_warehouse.execute(f"""
  CREATE TABLE {target_table} AS
      SELECT
          CONCAT("Product ID: ", product_id),
          quantity * price
      FROM {source_table};
  """)

extracted_data = extract(file_name="raw_sales_data.csv")
load(data_frame=extracted_data, table_name="raw_sales_data")

# Populate total_sales by transforming raw_sales_data
transform(source_table="raw_sales_data", target_table="total_sales")

In [None]:
Extracted data from file storage.
Loading extracted data to sale_items.
Ran the query: 
  CREATE TABLE total_sales AS
      SELECT
          CONCAT("Product ID: ", product_id),
          quantity * price
      FROM raw_sales_data;

# **Extracting, Transforming, and Loading Student Scores Data**

- Alright, it's time to build your own ETL pipeline from scratch. In this exercise, you'll build three functions; extract(), transform(), and load(). Then, you'll use these functions to run your pipeline.
  
  The pandas library has been imported as pd.

**Instructions**

- In the extract() function, use the appropriate pandas function to read a CSV into memory.

In [None]:
def extract(file_name):
  # Read a CSV with a path stored using file_name into memory
  return pd.read_csv(file_name)

**Instructions**

- Update the transform() function to filter the data_frame to only include the columns industry_name and number_of_firms.

In [None]:
def extract(file_name):
  return pd.read_csv(file_name)

def transform(data_frame):
  # Filter the data_frame to only incude a subset of columns
  return data_frame.loc[:, ["industry_name", "number_of_firms"]]

**Instructions**

- In the load() function write the data_frame to a path stored using the parameter file_name.

In [None]:
def extract(file_name):
  return pd.read_csv(file_name)

def transform(data_frame):
  return data_frame.loc[:, ["industry_name", "number_of_firms"]]

def load(data_frame, file_name):
  # Write the data_frame to a CSV
  data_frame.to_csv(file_name)

**Instructions**

- Pass the transformed_data DataFrame to the load() function, and run the ETL pipeline.

In [None]:
def extract(file_name):
  return pd.read_csv(file_name)

def transform(data_frame):
  return data_frame.loc[:, ["industry_name", "number_of_firms"]]

def load(data_frame, file_name):
  data_frame.to_csv(file_name)
  
extracted_data = extract(file_name="raw_industry_data.csv")
transformed_data = transform(data_frame=extracted_data)

# Pass the transformed_data DataFrame to the load() function
load(data_frame=transformed_data, file_name="number_of_firms.csv")