# üìä PhonePe Pulse Data Extraction Pipeline

This notebook implements a structured ETL pipeline for the **PhonePe Pulse Open Data Repository**.

The dataset contains 5000+ deeply nested JSON files organized by:

- State
- Year
- Quarter
- Transaction Type
- User Metrics
- District-Level Data

## üéØ Objectives

This pipeline performs:

1. Recursive directory traversal  
2. JSON normalization  
3. Transaction metric extraction  
4. User metric extraction  
5. District-level extraction  
6. Data cleaning & standardization  
7. CSV export for Power BI modeling  

The final structured datasets are prepared for **Star Schema modeling in Power BI**.

In [None]:
import os
import json
import pandas as pd
from pathlib import Path

## ‚öôÔ∏è Configuration

Define the base directory of the cloned PhonePe Pulse repository.

Ensure that the `pulse` folder is located in the same directory as this notebook.

In [None]:
BASE_DIR = Path("pulse/data")  # Update if your path differs

if not BASE_DIR.exists():
    print("‚ùå Base directory not found. Ensure you cloned the PhonePe Pulse repo correctly.")
else:
    print("‚úÖ Base directory located successfully.")

## üîπ Phase 1: Aggregated Transaction Extraction

This section extracts:

- Transaction Type  
- Transaction Count  
- Transaction Amount  
- State  
- Year  
- Quarter  

Data is stored in nested directories under:

`aggregated/transaction/country/india/state/`

In [None]:
def extract_aggregated_transactions(base_dir):
    path = base_dir / "aggregated/transaction/country/india/state"
    data_list = []

    if not path.exists():
        print(f"Path not found: {path}")
        return pd.DataFrame()

    for state in os.listdir(path):
        state_path = path / state

        for year in os.listdir(state_path):
            year_path = state_path / year

            for file in os.listdir(year_path):
                file_path = year_path / file

                with open(file_path, "r") as f:
                    data = json.load(f)

                try:
                    for item in data["data"]["transactionData"]:
                        data_list.append({
                            "State": state,
                            "Year": int(year),
                            "Quarter": int(file.strip(".json")),
                            "Transaction_Type": item["name"],
                            "Transaction_Count": item["paymentInstruments"][0]["count"],
                            "Transaction_Amount": item["paymentInstruments"][0]["amount"]
                        })
                except (TypeError, KeyError):
                    continue

    return pd.DataFrame(data_list)

In [None]:
print("üîÑ Extracting Aggregated Transactions...")
df_transactions = extract_aggregated_transactions(BASE_DIR)

if not df_transactions.empty:
    df_transactions.to_csv("phonepe_aggregated_transactions.csv", index=False)
    print("‚úÖ Saved as phonepe_aggregated_transactions.csv")
    print(f"Rows Extracted: {len(df_transactions)}")
else:
    print("‚ö†Ô∏è Extraction returned empty dataframe.")

## üîπ Phase 2: Aggregated User Data Extraction

This section extracts:

- Device Brand  
- User Count  
- Percentage Share  
- State  
- Year  
- Quarter  

Data source:

`aggregated/user/country/india/state/`

In [None]:
def extract_aggregated_users(base_dir):
    path = base_dir / "aggregated/user/country/india/state"
    user_list = []

    if not path.exists():
        print(f"Path not found: {path}")
        return pd.DataFrame()

    for state in os.listdir(path):
        state_path = path / state

        for year in os.listdir(state_path):
            year_path = state_path / year

            for file in os.listdir(year_path):
                file_path = year_path / file

                with open(file_path, "r") as f:
                    data = json.load(f)

                try:
                    if data["data"]["usersByDevice"]:
                        for brand_data in data["data"]["usersByDevice"]:
                            user_list.append({
                                "State": state,
                                "Year": int(year),
                                "Quarter": int(file.strip(".json")),
                                "Brand": brand_data["brand"],
                                "User_Count": brand_data["count"],
                                "Percentage": brand_data["percentage"]
                            })
                except (TypeError, KeyError):
                    continue

    return pd.DataFrame(user_list)

In [None]:
print("üîÑ Extracting Aggregated Users...")
df_users = extract_aggregated_users(BASE_DIR)

if not df_users.empty:
    df_users.to_csv("phonepe_aggregated_users.csv", index=False)
    print("‚úÖ Saved as phonepe_aggregated_users.csv")
    print(f"Rows Extracted: {len(df_users)}")
else:
    print("‚ö†Ô∏è No user data found.")

## üîπ Phase 3: District-Level Transaction Extraction

This section extracts district-level transaction metrics from:

`map/transaction/hover/country/india/state/`

Fields extracted:

- District  
- Transaction Count  
- Transaction Amount  
- State  
- Year  
- Quarter  

In [None]:
def extract_district_map_transactions(base_dir):
    path = base_dir / "map/transaction/hover/country/india/state"
    data_list = []

    if not path.exists():
        print(f"Path not found: {path}")
        return pd.DataFrame()

    for state in os.listdir(path):
        state_path = path / state

        for year in os.listdir(state_path):
            year_path = state_path / year

            for file in os.listdir(year_path):
                file_path = year_path / file

                with open(file_path, "r") as f:
                    data = json.load(f)

                try:
                    for item in data["data"]["hoverDataList"]:
                        data_list.append({
                            "State": state,
                            "Year": int(year),
                            "Quarter": int(file.strip(".json")),
                            "District": item["name"],
                            "Transaction_Count": item["metric"][0]["count"],
                            "Transaction_Amount": item["metric"][0]["amount"]
                        })
                except (TypeError, KeyError):
                    continue

    return pd.DataFrame(data_list)

In [None]:
print("üîÑ Extracting District-Level Data...")
df_districts = extract_district_map_transactions(BASE_DIR)

if not df_districts.empty:
    df_districts.to_csv("phonepe_district_map_transactions.csv", index=False)
    print("‚úÖ Saved as phonepe_district_map_transactions.csv")
    print(f"Rows Extracted: {len(df_districts)}")
else:
    print("‚ö†Ô∏è No district data found.")

## üßπ Phase 4: Data Cleaning & Standardization

Standardization steps:

- Convert state names to title case  
- Replace hyphens with spaces  
- Remove "District" suffix  
- Trim whitespace  

In [None]:
def clean_state_names(df):
    df["State"] = df["State"].str.replace("-", " ").str.title()
    return df

def clean_district_names(df):
    df["District"] = df["District"].str.replace(" District", "", regex=False)
    df["District"] = df["District"].str.strip().str.title()
    return df

df_transactions = clean_state_names(df_transactions)
df_users = clean_state_names(df_users)
df_districts = clean_state_names(df_districts)
df_districts = clean_district_names(df_districts)

df_transactions.to_csv("phonepe_aggregated_transactions.csv", index=False)
df_users.to_csv("phonepe_aggregated_users.csv", index=False)
df_districts.to_csv("phonepe_district_map_transactions.csv", index=False)

print("‚úÖ Data cleaning completed successfully.")

## üöÄ ETL Pipeline Completed Successfully

Generated structured datasets:

- phonepe_aggregated_transactions.csv
- phonepe_aggregated_users.csv
- phonepe_district_map_transactions.csv

These datasets are now ready for:

- Star Schema Modeling  
- DAX Calculations  
- Shape Map Integration  
- Forecasting  
- Decomposition Tree Analysis  

### üîó Next Step

Import the generated CSV files into Power BI and apply DAX modeling as documented in:

Scripts/DAX-Measures.md