## Extracting the Data

In [1]:
import pandas as pd

In [17]:
df = pd.read_csv(r"C:\Users\hp\Desktop\10alytics\Capstone Project\Customer-Churn.csv")

In [18]:
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


## Transforming the Data to ensure data quality
This process involves converting data coming from  different systems into a format suitable for the target system
* Data cleaning
* removing duplicates
* Data aggregation like summing for easier use
* standardization/integration eg converting currencies to default currency if items are sold in different currencies or converting datetimes
* data filtering removing unwanted information from data 

# Finding duplicates

In [4]:
df.columns

Index(['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents',
       'tenure', 'PhoneService', 'MultipleLines', 'InternetService',
       'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
       'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
       'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn'],
      dtype='object')

In [5]:
columns_to_check = ['customerID',]

In [6]:
# lets assign the duplicates to the duplicates variable
Duplicates = df[df.duplicated('customerID',)]
print(Duplicates)

Empty DataFrame
Columns: [customerID, gender, SeniorCitizen, Partner, Dependents, tenure, PhoneService, MultipleLines, InternetService, OnlineSecurity, OnlineBackup, DeviceProtection, TechSupport, StreamingTV, StreamingMovies, Contract, PaperlessBilling, PaymentMethod, MonthlyCharges, TotalCharges, Churn]
Index: []

[0 rows x 21 columns]


In [7]:
Duplicates.shape

(0, 21)

## I would do this if i want to check for duplicates in the entire dataset

In [8]:
duplicates=df.duplicated().sum()
duplicates

0

# # Observation
there are no duplicates 

# if there where to be duplicates this is how i would remove it 

In [9]:
 # keep only the first instance of our duplicates 
#df = df[df.duplicated('customerID', keep='first')]
#print(df)

In [10]:
# checking for missing values
df = df.isnull().sum()
df

customerID          0
gender              0
SeniorCitizen       0
Partner             0
Dependents          0
tenure              0
PhoneService        0
MultipleLines       0
InternetService     0
OnlineSecurity      0
OnlineBackup        0
DeviceProtection    0
TechSupport         0
StreamingTV         0
StreamingMovies     0
Contract            0
PaperlessBilling    0
PaymentMethod       0
MonthlyCharges      0
TotalCharges        0
Churn               0
dtype: int64

# if i want to drop/remove columns i will do this 

In [11]:
#df= df.drop(['customerID', 'gender'])
#df

In [12]:
# Alternatively
#columns_to_remove=['customerID', 'gender']
#df= df.drop(columns =columns_to_remove)
#df

# Loading data into bigquery

In [13]:
import pandas
import pandas_gbq
from google.cloud import bigquery

print("Libraries imported successfully!")


Libraries imported successfully!


 ## Set Up the Authentication

In [14]:
import os

# Set the path to my service account key JSON file
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"C:\Users\hp\Desktop\etl pipeline with bigquery\etl-pipeline-444417-ca190dd4dcb5.json"



## Test the Connection

In [15]:
from google.cloud import bigquery

# Initialize BigQuery client
client = bigquery.Client()

# Test query to get the current timestamp
query = "SELECT CURRENT_TIMESTAMP() AS current_time"
query_job = client.query(query)

# Print the result
for row in query_job:
    print(f"Current Time: {row['current_time']}")


Current Time: 2024-12-11 23:02:34.820853+00:00


# Load the data to the data warehouse

In [19]:
from pandas_gbq import to_gbq

# BigQuery details
project_id = "etl-pipeline-444417"  # my Google Cloud project ID
dataset_id = "etl_dataset"  # my dataset name
table_id = "main_table"  # Name for my table in BigQuery

# Upload DataFrame to BigQuery
to_gbq(
    dataframe=df, 
    destination_table=f"{dataset_id}.{table_id}",
    project_id=project_id,
    if_exists="replace" 
)

print("Data uploaded successfully to BigQuery!")


100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]

Data uploaded successfully to BigQuery!





# Tests and checks for data quality to ensure data is not lost , truncated or corrupted during at the source or destination
* check for nulls , empty rows, duplicates
* counting the number of columns in the source and destination
* check if data is fresh, its fresh if the maximum timestamp in the table is C minus 1 day where C = current timestamp

  runnig df.shape in python and count(*) in bigquery does a basic check

In [20]:
df.shape

(7043, 21)

# Automate ETL to make sure its always fresh , scheduling jobs with Apache Airflow
other tools we can use are 
- MS Azure data factory
- NiFI
- Oosie
- SSIS

# Open docker using command terminal run the following in these steps 
* Invoke-WebRequest -Uri "https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml" -OutFile "docker-compose.yaml"
* New-Item -ItemType Directory -Force -Path "dags", "logs", "plugins", "config"
* [Environment]::SetEnvironmentVariable('AIRFLOW_UID', '50000', 'User')
* [Environment]::SetEnvironmentVariable('AIRFLOW_UID', '50000', 'User')
* docker compose up airflow-init
* docker compose up -d
* After these commands complete successfully, access the Airflow web interface at:

http://localhost:8080
Username: airflow
Password: airflow

Make sure Docker Desktop is running on Windows machine before executing these commands.
* 


Make sure to to set up a connection in Airflow for BigQuery. Here's how to fix it:

In Airflow UI:

Click on "Admin" in the top menu
Click on "Connections"
Click the "+" button to add a new connection


Fill in these details:

Connection Id: google_cloud_default
Connection Type: Google Cloud
Keyfile Path: Your JSON key file path
Project Id: etl-pipeline-444417