In [7]:
# Data Pipelines
# Extract Data, Transform, Load and Analyze.
import pandas as pd

In [None]:
!pip install "apache-airflow[celery]==2.5.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.3/constraints-3.7.txt"
!pip install "apache-airflow-providers-google==6.0.0"

In [5]:
# Pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.operators.gcs import GCSToGCSOperator

In [None]:
default_args = {
    'owner': 'Safaricom',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 10),
    'email': ['newton.kipngeno@student.moringaschool.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'billing_pipeline',
    default_args=default_args,
    description='Pipeline for billing',
    schedule_interval=timedelta(days=1),
)

# Functions
# Extract data
def extract_csv(filename):
  data = pd.read_csv(filename)
  return data

# transform
def transform_data(dataframe):
  # Remove duplicates
  dataframe = dataframe.drop_duplicates()
  # remove nulls
  dataframe = dataframe.dropna()
  return dataframe

#load data
def load_data(df):
    filename = datetime.today().strftime("%Y%m%d") + '_billing.csv'
    df.to_csv(filename, index=False)
    return filename

def upload_to_gcs(filename):
    gcs_hook = GoogleCloudStorageHook()
    gcs_hook.upload(
        bucket='mybucket',
        object='data/cdrs/' + filename,
        filename=filename,
    )
    
extract = PythonOperator(
    task_id='extract_csv',
    python_callable=extract_csv,
    dag=dag,
)

transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

load = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

upload = GCSToGCSOperator(
    task_id='upload_to_gcs',
    source_bucket='newton_bucket',
    source_object='data/billing/{{ ds_nodash }}_billing.csv',
    destination_bucket='newton_bucket',
    destination_object='archive/cdrs/{{ ds_nodash }}_billing.csv',
    dag=dag,
)

extract >> transform >> load >> upload
