# Data Eng: Week 2 Practical

In this practical you will modify the pipeline from the practical of week 1 to read from Google Cloud Storage (GCS) and Amazon AWS S3. You will also modify the extract step to extract data for a single day.

---

## Overview

Our data sources are still `users` and `orders`. However `users` is stored on a GCS bucket, and `orders` is a parquet file on an S3 bucket.

We want to extract data for a particular day, so that this job may be sheduled/orchestrated in future.

WTC-Week2.drawio.png


**Note**:
  * You must process the data in memory and not by writing it to the disk of this notebook first.


In [None]:
# setup  your python environment by installing the necessary packages
# we run this command using the bash operator prefix `!`, which executes bash
# code on the underlying machine
# Student: You will need to add the missing packages to install to access Google Cloud Storage, AWS S3, etc

! pip install --quiet pandas

In [None]:
# import the python packages
import pandas as pd

# import package to read from google cloud storage
from google.cloud import storage
from io import StringIO

## Extract
Complete the extract function for the users 'table'.

The required csv file (`users_v.csv`) is stored on a GCS bucket (`bdt-beam`).

_HINT: To read from a public bucket, you need to create an anonymous/not-authenticated connection - see [stackoverflow](https://stackoverflow.com/questions/55421293/use-python-google-storage-client-without-credentials)_

In [None]:
###
# this function is to be completed by the student
##
def gcs_extract(bucket_name: str, filename: str,  delimiter: str=',') -> pd.DataFrame:
  """
  extract data from a csv file in a GCS bucket and structure it into a pandas dataframe

  Args:
     bucket_name (str): name of the bucket
     filename (str): name of the file
     delimiter (str): default ','

  Return:
     pandas.DataFrame: A pandas dataframe with header as defined by the csv file
  """

  try:
    ## --- begin: student to complete subsequent code using Google SDK
    # create a client to access GCS
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(filename)
    data = blob.download_as_text()
    df = pd.read_csv(StringIO(data), delimiter=delimiter)
    return df
  except Exception as e:
      print(f"Error during extraction: {e}")
      return None
  ### --- end
  except FileNotFoundError:
    # Handle the case where the file is not found in GCS
    print(f"Error: File '{filename}' not found.")
    return None

In [None]:
## --- begin: student to complete subsequent code

# Complete the function call to read users.csv into a dataframe and check the result
df_users = gcs_extract('bucket-12345', 'data/users.csv')
df_users.head()
# --- end:

Complete the extract function for the orders 'table'.

Extract the data for a specific day (2000-01-02) only by providing the parameter. Read the parquet file and explore the format before parameterising it.

The required parquet file (`df_orders.parquet`) is stored in an S3 bucket (`dev-training-analytics`)

_HINT: To read a parquet file into a pandas dataframe, you can use the pandas read_parquet() function - see [stackoverflow](https://stackoverflow.com/questions/33813815/how-to-read-a-parquet-file-into-pandas-dataframe). You may also need to add some additional packages to your import cell at the top of the notebook._

In [None]:
###
# this function is to be completed by the student
##
def s3_extract(bucket_name: str, filename: str) -> pd.DataFrame:
  """
  extract data from a file in an S3 bucket and structure it into a pandas dataframe

  Args:
     bucket_name (str): name of the bucket
     filename (str): name of the file

  Return:
     pandas.DataFrame: A pandas dataframe with header as defined by the csv file
  """

  try:
    ## --- begin: student to complete subsequent code
    import boto3
    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=bucket_name, Key=filename)
    df = pd.read_csv(obj['Body'])
    return df
  except Exception as e:
      print(f"Error during extraction: {e}")
      return None

    ### --- end
  except FileNotFoundError:
    # Handle the case where the file is not found in S3
    print(f"Error: File '{filename}' not found.")
    return None

In [None]:
## --- begin: student to complete subsequent code

# Complete the function call to read orders.csv into a dataframe and check the result
df_orders = s3_extract('bucket-12345', 'data/orders.csv')
df_orders.head()
# --- end:

## Transform

Our transform function remains the same as in Week 1.

In [None]:
def transform(users: pd.DataFrame, orders: pd.DataFrame) -> pd.DataFrame:
  """
  Take two dataframes, and produce a dataset

  Args:
     users: a dataframe of user information
     orders: a dataframe of user orders

  Return:
     pandas.DataFrame: A pandas dataframe with header as defined by user
     requirements
  """
  joined = df_users.merge(df_orders,  on='user_id')
  # perform the groupby operation and be sure to subset - `count` will
  # count the purchases.
  df = joined.groupby(['date_purchased', 'gender'])['user_id'].count().rename('total_purchases')
  return df.reset_index()



In [None]:
df_transform = transform(df_users, df_orders)
df_transform.head()

### Load

Our load step remains the same as in Week 1.


In [None]:
def load(transformed: pd.DataFrame, filename, delimiter=',') -> None:
  """
  Write the dataframe to a csv with specified delimeter

  Args:
    transformed: a dataframe with columns ['date_purchased','gender','total_purchases']
    filename: a filename for the output CSV
    delimeter: an optional delimiter signifying column boundaries in the CSV file
  Return:
    None
  """
  try:
    # Write the DataFrame to a CSV file using pandas.to_csv
    transformed.to_csv(filename, sep=delimiter, index=False)
    print(f"DataFrame successfully written to '{filename}'.")
  except Exception as e:
    # Handle any errors that might occur during writing
    print(f"Error writing DataFrame to CSV: {e}")

## Pipeline

Run your pipeline end to end

Expected output (use `!head output.csv`):


| date_purchased | gender | total_purchases |
| --- | ---- | ---- |
| 2000-01-02 | female | 4 |
| 2000-01-02 | male | 10 |

In [None]:
## --- begin: student to complete subsequent code - remember the try/except

# Run your pipeline here using the functions you have created
if df_users is not None and df_orders is not None:
    transformed = transform(df_users, df_orders)
    if transformed is not None:
        load(transformed, 'transformed_data.csv')
        print("Data pipeline completed successfully.")

## --- end
