In [4]:
# Step 1: Import necessary libraries
import pandas as pd
import os

# Step 2: Check uploaded files
print("Uploaded files in Colab:")
print(os.listdir())   # This will show your uploaded CSV

# Step 3: Extract Function
def extract():
    # Use exact filename from uploaded files
    data = pd.read_csv('/content/Ecommerce Customers.csv')   # <-- Replace 'Superstore.csv' with YOUR FILE name if different
    print("✅ Data extracted successfully!")
    return data

# Step 4: Transform Function
def transform(data):
    # Drop rows with missing important values
    data = data.dropna(subset=['Email'])   # Only drop if 'Email' is missing

    # Clean column names
    data.columns = data.columns.str.lower().str.replace(' ', '_')

    # Example: Create a new calculated field
    if 'time_on_app' in data.columns and 'time_on_website' in data.columns:
        data['total_time_spent'] = data['time_on_app'] + data['time_on_website']

    print("✅ Data transformed successfully!")
    return data


# Step 5: Load Function
def load(data):
    data.to_csv('cleaned_data.csv', index=False)
    print("✅ Data loaded successfully! Saved as 'cleaned_data.csv'.")

# Step 6: Run ETL Process
raw_data = extract()
clean_data = transform(raw_data)
load(clean_data)


Uploaded files in Colab:
['.config', 'Ecommerce Customers.csv', 'sample_data']
✅ Data extracted successfully!
✅ Data transformed successfully!
✅ Data loaded successfully! Saved as 'cleaned_data.csv'.


In [5]:
!pip install pyspark




In [6]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.master("local[1]").appName('ETL Example').getOrCreate()

# Read CSV using Spark
df_spark = spark.read.csv('cleaned_data.csv', header=True, inferSchema=True)

# Show data
df_spark.show()


+--------------------+--------------------+------------------+-------------------+------------------+------------------+--------------------+-------------------+-----------------+
|               email|             address|            avatar|avg._session_length|       time_on_app|   time_on_website|length_of_membership|yearly_amount_spent| total_time_spent|
+--------------------+--------------------+------------------+-------------------+------------------+------------------+--------------------+-------------------+-----------------+
|mstephenson@ferna...|    835 Frank Tunnel|              NULL|               NULL|              NULL|              NULL|                NULL|               NULL|             NULL|
|         Wrightmouth|      MI 82180-9605"|            Violet|  34.49726772511229|12.655651149166752| 39.57766801952616|   4.082620632952961|  587.9510539684005|52.23331916869291|
|   hduke@hotmail.com|  4547 Archer Common|              NULL|               NULL|              NULL

In [1]:
!pip install apache-airflow




In [2]:
!airflow db init


[91mUsage:[0m [37mairflow db[0m [[36m-h[0m] [36mCOMMAND[0m [36m...[0m

[39mDatabase operations[0m

[91mPositional Arguments:[0m
  [36mCOMMAND[0m
    [36mcheck[0m           [39mCheck if the database can be reached[0m
    [36mcheck-migrations[0m
                    [39mCheck if migration have finished[0m
    [36mclean[0m           [39mPurge old records in metastore tables[0m
    [36mdowngrade[0m       [39mDowngrade the schema of the metadata database.[0m
    [36mdrop-archived[0m   [39mDrop archived tables created through the db clean command[0m
    [36mexport-archived[0m
                    [39mExport archived data from the archive tables[0m
    [36mmigrate[0m         [39mMigrates the metadata database to the latest version[0m
    [36mreset[0m           [39mBurn down and rebuild the metadata database[0m
    [36mshell[0m           [39mRuns a shell to access the database[0m

[91mOptions:[0m
  [36m-h[0m, [36m--help[0m        [39msho

In [3]:
!airflow webserver -p 8080


[91mUsage:[0m [37mairflow[0m [[36m-h[0m] [36mGROUP_OR_COMMAND[0m [36m...[0m

[91mPositional Arguments:[0m
  [36mGROUP_OR_COMMAND[0m

[36m    Groups[0m
      [36massets[0m         [39mManage assets[0m
      [36mbackfill[0m       [39mManage backfills[0m
      [36mconfig[0m         [39mView configuration[0m
      [36mconnections[0m    [39mManage connections[0m
      [36mdags[0m           [39mManage DAGs[0m
      [36mdb[0m             [39mDatabase operations[0m
      [36mjobs[0m           [39mManage jobs[0m
      [36mpools[0m          [39mManage pools[0m
      [36mproviders[0m      [39mDisplay providers[0m
      [36mtasks[0m          [39mManage tasks[0m
      [36mvariables[0m      [39mManage variables[0m

[36m    Commands:[0m
      [36mapi-server[0m     [39mStart an Airflow API server instance[0m
      [36mcheat-sheet[0m    [39mDisplay cheat sheet[0m
      [36mdag-processor[0m  [39mStart a dag processor instance[0m
 

In [15]:
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator  # Updated import
from datetime import datetime
import pandas as pd

# Function to extract data
def extract():
    df = pd.read_csv('/path_to_your_cleaned_data.csv')  # Replace with the actual path
    print("Data extracted successfully")
    return df

# Function to transform data
def transform(df):
    # Perform any transformations or cleaning here
    df['time_on_app'] = df['time_on_app'].fillna(df['time_on_app'].mean())  # Example transformation
    print("Data transformed successfully")
    return df

# Function to load data into another CSV or database
def load(df):
    df.to_csv('/path_to_output/cleaned_data_output.csv', index=False)  # Replace with the actual path
    print("Data loaded successfully")

# Define the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'retries': 1,
}

dag = DAG(
    'ecommerce_etl_pipeline',
    default_args=default_args,
    description='A simple ETL pipeline for Ecommerce data',
    schedule_interval='@daily',  # Run once a day, you can modify this to your need
)

# Define tasks in the DAG
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform,
    op_args=['{{ task_instance.xcom_pull(task_ids="extract_data") }}'],  # Pull data from extract
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load,
    op_args=['{{ task_instance.xcom_pull(task_ids="transform_data") }}'],  # Pull data from transform
    dag=dag,
)

# Define task dependencies
extract_task >> transform_task >> load_task


TypeError: DAG.__init__() got an unexpected keyword argument 'schedule_interval'

In [16]:
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime

def my_function():
    print("Hello from PythonOperator")

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'start_date': datetime(2025, 4, 26)
}

with DAG(
    dag_id='ecommerce_etl_pipeline',
    default_args=default_args,
    description='Ecommerce ETL pipeline using Airflow',
    schedule='@daily',    # ✅ use schedule instead of schedule_interval
    catchup=False,
) as dag:

    task1 = PythonOperator(
        task_id='print_hello',
        python_callable=my_function
    )


In [17]:
def extract():
    import pandas as pd
    data = pd.read_csv('/content/Ecommerce Customers.csv')
    data.to_csv('/content/raw_data.csv', index=False)
    print("✅ Extracted raw data")

def transform():
    import pandas as pd
    data = pd.read_csv('/content/raw_data.csv')
    data = data.dropna()  # Example cleaning
    data.to_csv('/content/clean_data.csv', index=False)
    print("✅ Transformed data")

def load():
    import pandas as pd
    data = pd.read_csv('/content/clean_data.csv')
    # (Optional) In real-world you would load to Database or Cloud
    print(f"✅ Loaded {len(data)} records successfully!")


In [20]:
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'start_date': datetime(2025, 4, 26)
}

with DAG(
    dag_id='ecommerce_etl_pipeline',
    default_args=default_args,
    description='Simple Ecommerce ETL Pipeline',
    schedule='@daily',
    catchup=False,
) as dag:

    extract_task = PythonOperator(
        task_id='extract_task',
        python_callable=extract
    )

    transform_task = PythonOperator(
        task_id='transform_task',
        python_callable=transform
    )

    load_task = PythonOperator(
        task_id='load_task',
        python_callable=load
    )

    # 👇 Set the order
    extract_task >> transform_task >> load_task


In [21]:
# Manually simulate the ETL steps
extract()
transform()
load()


✅ Extracted raw data
✅ Transformed data
✅ Loaded 500 records successfully!


In [22]:
!git config --global user.email mruduk1812@gmail.com

In [23]:
!git config --global user.name mruduk1812

In [24]:
!apt-get install git


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git is already the newest version (1:2.34.1-1ubuntu1.12).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.


In [25]:
!git init


[33mhint: Using 'master' as the name for the initial branch. This default branch name[m
[33mhint: is subject to change. To configure the initial branch name to use in all[m
[33mhint: [m
[33mhint: 	git config --global init.defaultBranch <name>[m
[33mhint: [m
[33mhint: Names commonly chosen instead of 'master' are 'main', 'trunk' and[m
[33mhint: 'development'. The just-created branch can be renamed via this command:[m
[33mhint: [m
[33mhint: 	git branch -m <name>[m
Initialized empty Git repository in /content/.git/


In [26]:
!cp /content/Ecommerce\ Customers.csv .
!cp /content/ ETL_pipeline.ipynb.  # if you have saved your ETL code


cp: '/content/Ecommerce Customers.csv' and './Ecommerce Customers.csv' are the same file
cp: -r not specified; omitting directory '/content/'


In [27]:
!git remote add origin https://github.com/mruduk1812/ecommerce-etl-pipeline.git


In [28]:
!git add .
!git commit -m "First commit from Google Colab"


[master (root-commit) 3a0d40f] First commit from Google Colab
 25 files changed, 55029 insertions(+)
 create mode 100644 .config/.last_opt_in_prompt.yaml
 create mode 100644 .config/.last_survey_prompt.yaml
 create mode 100644 .config/.last_update_check.json
 create mode 100644 .config/active_config
 create mode 100644 .config/config_sentinel
 create mode 100644 .config/configurations/config_default
 create mode 100644 .config/default_configs.db
 create mode 100644 .config/gce
 create mode 100644 .config/hidden_gcloud_config_universe_descriptor_data_cache_configs.db
 create mode 100644 .config/logs/2025.04.24/18.19.17.922226.log
 create mode 100644 .config/logs/2025.04.24/18.19.38.522066.log
 create mode 100644 .config/logs/2025.04.24/18.19.46.929623.log
 create mode 100644 .config/logs/2025.04.24/18.19.48.089267.log
 create mode 100644 .config/logs/2025.04.24/18.19.56.709493.log
 create mode 100644 .config/logs/2025.04.24/18.19.57.353004.log
 create mode 100644 Ecommerce Customers.csv

In [None]:
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

In [32]:
!git push https://mruduk1812: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@github.com/mruduk1812/ecommerce-etl-pipeline.git


remote: Repository not found.
fatal: repository 'https://github.com/mruduk1812/ecommerce-etl-pipeline.git/' not found


In [33]:
!git config --global user.email "mruduk1812@gmail.com"
!git config --global user.name "MrudulaK1812"


In [34]:
!git init
!git add .
!git commit -m "Initial commit - Ecommerce Data ETL Project"


Reinitialized existing Git repository in /content/.git/
On branch master
nothing to commit, working tree clean


In [35]:
!git remote add origin https://MrudulaK1812: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@github.com/MrudulaK1812/ecommerce-data-etl.git


error: remote origin already exists.


In [36]:
!git branch -M main
!git push -u origin main


fatal: could not read Username for 'https://github.com': No such device or address
