<a href="https://colab.research.google.com/github/LonnieSly/-ETL_pipeline_project_01/blob/main/etl_pipeline_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
from pandas_gbq import to_gbq
from datetime import datetime
import time

In [2]:
# Step 1: Load data
df = pd.read_csv("restaurant_reviews.csv")


In [3]:
# Step 2: Quick check of structure
print("Shape:", df.shape)
print("Columns:", df.columns.tolist())
print("Sample rows:")
print(df.head())

Shape: (10000, 8)
Columns: ['Restaurant', 'Reviewer', 'Review', 'Rating', 'Metadata', 'Time', 'Pictures', '7514']
Sample rows:
        Restaurant              Reviewer  \
0  Beyond Flavours     Rusha Chakraborty   
1  Beyond Flavours  Anusha Tirumalaneedi   
2  Beyond Flavours       Ashok Shekhawat   
3  Beyond Flavours        Swapnil Sarkar   
4  Beyond Flavours                Dileep   

                                              Review Rating  \
0  The ambience was good, food was quite good . h...      5   
1  Ambience is too good for a pleasant evening. S...      5   
2  A must try.. great food great ambience. Thnx f...      5   
3  Soumen das and Arun was a great guy. Only beca...      5   
4  Food is good.we ordered Kodi drumsticks and ba...      5   

                  Metadata             Time  Pictures    7514  
0   1 Review , 2 Followers  5/25/2019 15:54         0  2447.0  
1  3 Reviews , 2 Followers  5/25/2019 14:20         0     NaN  
2  2 Reviews , 3 Followers  5/24/2019

In [4]:
# Make a copy to work on
df_cleaned = df.copy()

In [5]:
# 1. Drop corrupted/misnamed column
if '7514' in df_cleaned.columns:
    df_cleaned.drop(columns=['7514'], inplace=True)

In [6]:
print("Columns:",df_cleaned.columns.tolist())

Columns: ['Restaurant', 'Reviewer', 'Review', 'Rating', 'Metadata', 'Time', 'Pictures']


In [7]:
# 2. Check for null values in critical columns
print(df_cleaned[['Review', 'Rating', 'Time', 'Metadata']].isnull().sum())

Review      45
Rating      38
Time        38
Metadata    38
dtype: int64


In [8]:
# 3. Drop rows with missing reviews or ratings
df_cleaned.dropna(subset=['Review', 'Rating'], inplace=True)

print(df_cleaned[['Review', 'Rating', 'Time', 'Metadata']].isnull().sum())

Review      0
Rating      0
Time        0
Metadata    0
dtype: int64


In [9]:
# 4. Convert Rating to float (some are stored as strings)
df_cleaned['Rating'] = pd.to_numeric(df_cleaned['Rating'], errors='coerce')

print(df_cleaned['Rating'].dtype)

float64


In [10]:
# 5. Convert Time to datetime
df_cleaned['Time'] = pd.to_datetime(df_cleaned['Time'], errors='coerce')

print(df_cleaned['Time'].dtype)

datetime64[ns]


In [11]:
# 6. Drop rows where time is still null (bad formatting)
df_cleaned.dropna(subset=['Time'], inplace=True)

In [12]:
# 7. Extract "Review Count" and "Follower Count" from Metadata
def parse_metadata(meta):
    try:
        reviews = int(meta.split(",")[0].strip().split()[0])
        followers = int(meta.split(",")[1].strip().split()[0])
        return pd.Series([reviews, followers])
    except:
        return pd.Series([None, None])

df_cleaned[['Review_Count', 'Follower_Count']] = df_cleaned['Metadata'].apply(parse_metadata)

In [13]:
print("Columns:", df_cleaned.columns.tolist())

Columns: ['Restaurant', 'Reviewer', 'Review', 'Rating', 'Metadata', 'Time', 'Pictures', 'Review_Count', 'Follower_Count']


In [14]:
# 8. Drop rows with null Review_Count or Follower_Count (optional)
df_cleaned.dropna(subset=['Review_Count', 'Follower_Count'], inplace=True)

In [15]:
# 9. Convert extracted counts to integers
df_cleaned['Review_Count'] = df_cleaned['Review_Count'].astype(int)
df_cleaned['Follower_Count'] = df_cleaned['Follower_Count'].astype(int)

In [16]:
# 10. Reset index for a clean export
df_cleaned.reset_index(drop=True, inplace=True)

In [17]:
# Final check
print(df_cleaned.info())
print(df_cleaned.head(2))

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8381 entries, 0 to 8380
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   Restaurant      8381 non-null   object        
 1   Reviewer        8381 non-null   object        
 2   Review          8381 non-null   object        
 3   Rating          8380 non-null   float64       
 4   Metadata        8381 non-null   object        
 5   Time            8381 non-null   datetime64[ns]
 6   Pictures        8381 non-null   int64         
 7   Review_Count    8381 non-null   int64         
 8   Follower_Count  8381 non-null   int64         
dtypes: datetime64[ns](1), float64(1), int64(3), object(4)
memory usage: 589.4+ KB
None
        Restaurant              Reviewer  \
0  Beyond Flavours     Rusha Chakraborty   
1  Beyond Flavours  Anusha Tirumalaneedi   

                                              Review  Rating  \
0  The ambience was good, food was qu

In [18]:
df_cleaned.to_csv("restaurant_reviews_cleaned.csv", index=False)


In [19]:
# Load the cleaned CSV uploaded
df = pd.read_csv('restaurant_reviews_cleaned.csv')

df.head()

Unnamed: 0,Restaurant,Reviewer,Review,Rating,Metadata,Time,Pictures,Review_Count,Follower_Count
0,Beyond Flavours,Rusha Chakraborty,"The ambience was good, food was quite good . h...",5.0,"1 Review , 2 Followers",2019-05-25 15:54:00,0,1,2
1,Beyond Flavours,Anusha Tirumalaneedi,Ambience is too good for a pleasant evening. S...,5.0,"3 Reviews , 2 Followers",2019-05-25 14:20:00,0,3,2
2,Beyond Flavours,Ashok Shekhawat,A must try.. great food great ambience. Thnx f...,5.0,"2 Reviews , 3 Followers",2019-05-24 22:54:00,0,2,3
3,Beyond Flavours,Swapnil Sarkar,Soumen das and Arun was a great guy. Only beca...,5.0,"1 Review , 1 Follower",2019-05-24 22:11:00,0,1,1
4,Beyond Flavours,Dileep,Food is good.we ordered Kodi drumsticks and ba...,5.0,"3 Reviews , 2 Followers",2019-05-24 21:37:00,0,3,2


In [20]:
# Install the BigQuery connector
!pip install pandas-gbq --quiet

In [21]:
# Authenticate to access your BigQuery Sandbox
from google.colab import auth
auth.authenticate_user()

In [22]:
# Setup BigQuery Project + Dataset Details
PROJECT_ID = 'pro-router-452116-c3'
DATASET = 'restaurant_reviews_dataset'
TABLE = 'cleaned_reviews'

# Destination format
DESTINATION_TABLE = f"{DATASET}.{TABLE}"

In [23]:
# Push to BigQuery (will create the table and dataset if needed)
to_gbq(df, destination_table=DESTINATION_TABLE, project_id=PROJECT_ID, if_exists='replace')


100%|██████████| 1/1 [00:00<00:00, 12409.18it/s]


In [24]:
# Simulated Task Logger
def log(task_name):
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ✅ Task completed: {task_name}")
    time.sleep(1)  # simulate task delay

In [25]:
# Step 1: Extract
def extract_data():
    print("🔍 Extracting data from CSV...")
    df = pd.read_csv('restaurant_reviews_cleaned.csv')
    log("extract_data")
    return df

In [26]:
# Step 2: Transform (simulate even though it's already clean)
def transform_data(df):
    print("🛠️  Transforming data...")
    df['Time'] = pd.to_datetime(df['Time'], errors='coerce')
    df = df.dropna(subset=['Review', 'Rating', 'Time'])
    df['Rating'] = pd.to_numeric(df['Rating'], errors='coerce')
    df = df.reset_index(drop=True)
    log("transform_data")
    return df

In [27]:
# Step 3: Load to BigQuery
def load_data(df, project_id, dataset, table):
    from pandas_gbq import to_gbq
    destination_table = f"{dataset}.{table}"
    print("📤 Loading data to BigQuery...")
    to_gbq(df, destination_table=destination_table, project_id=project_id, if_exists='replace')
    log("load_data")

In [28]:
# DAG Parameters
PROJECT_ID = 'pro-router-452116-c3'
DATASET = 'restaurant_reviews_dataset'
TABLE = 'cleaned_reviews'

In [29]:
# === Simulated DAG Run ===
print("🚀 Starting ETL DAG simulation...\n")
df_raw = extract_data()
df_transformed = transform_data(df_raw)
load_data(df_transformed, PROJECT_ID, DATASET, TABLE)

print("\n✅ DAG pipeline run completed!")

🚀 Starting ETL DAG simulation...

🔍 Extracting data from CSV...
[2025-06-25 17:26:58] ✅ Task completed: extract_data
🛠️  Transforming data...
[2025-06-25 17:26:59] ✅ Task completed: transform_data


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Rating'] = pd.to_numeric(df['Rating'], errors='coerce')


📤 Loading data to BigQuery...


100%|██████████| 1/1 [00:00<00:00, 14513.16it/s]


[2025-06-25 17:27:05] ✅ Task completed: load_data

✅ DAG pipeline run completed!
