In [47]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# PhonePe Pulse API Ingestion â€“ Users (Aggregated)

This notebook performs API-based ingestion of aggregated user data from the official PhonePe Pulse repository.

Objective:
- Fetch quarterly user JSON files
- Extract registered users and app opens
- Parse device-wise distribution (if available)
- Convert nested JSON into structured tabular format
- Export clean CSV for warehouse loading


## 1. Import Required Libraries

In this step, we import libraries required for:

- Sending API requests
- Handling JSON responses
- Structuring data using Pandas
- Exporting cleaned CSV files

These libraries support the ETL ingestion process.


In [48]:
import requests
import json
import pandas as pd
import os
from time import sleep

## 2. Define Users API Source

We define the base GitHub raw URL for the aggregated users dataset.

Source:
Official PhonePe Pulse public repository

Dataset Path:
aggregated/user/state/

This dataset provides:
- Registered users
- App opens
- Device-level distribution (brand data)


## 3. Define Time Coverage

We define the year and quarter range for ingestion.

The loop dynamically constructs URLs for:
- Each state
- Each year
- Each quarter

This ensures complete coverage of the available dataset.


In [49]:
USERS_BASE_URL = (
    "https://raw.githubusercontent.com/PhonePe/pulse/master/"
    "data/aggregated/user/country/india/state"
)

START_YEAR = 2018
END_YEAR = 2024
QUARTERS = [1, 2, 3, 4]


In [50]:
USERS_RAW_DIR = "/kaggle/working/users_raw"
os.makedirs(USERS_RAW_DIR, exist_ok=True)

print("Users raw folder ready:", USERS_RAW_DIR)

Users raw folder ready: /kaggle/working/users_raw


In [51]:
test_url = (
    "https://raw.githubusercontent.com/PhonePe/pulse/master/"
    "data/aggregated/user/country/india/state/"
    "maharashtra/2018/1.json"
)

r = requests.get(test_url)

print("Status code:", r.status_code)

if r.status_code == 200:
    data = r.json()
    print("Top-level keys:", data.keys())
    print("Data keys:", data["data"].keys())
else:
    print("Response preview:", r.text[:200])


Status code: 200
Top-level keys: dict_keys(['success', 'code', 'data', 'responseTimestamp'])
Data keys: dict_keys(['aggregated', 'usersByDevice'])


In [52]:
import requests

states_url = (
    "https://api.github.com/repos/PhonePe/pulse/contents/"
    "data/aggregated/user/country/india/state"
)

response = requests.get(states_url)

if response.status_code == 200:
    states = sorted([item["name"] for item in response.json()])
    print("Total states found:", len(states))
    print("Sample states:", states[:10])
else:
    raise Exception("Failed to fetch states list")


Total states found: 36
Sample states: ['andaman-&-nicobar-islands', 'andhra-pradesh', 'arunachal-pradesh', 'assam', 'bihar', 'chandigarh', 'chhattisgarh', 'dadra-&-nagar-haveli-&-daman-&-diu', 'delhi', 'goa']


## 4. Fetch User JSON Data

For each state, year, and quarter:

- Construct API URL
- Send request
- Validate response status
- Parse JSON only if response is successful

This prevents failures due to missing files.


In [53]:
failed_requests = []
saved_files = 0

for state in states:
    for year in range(START_YEAR, END_YEAR + 1):
        for quarter in QUARTERS:

            url = f"{USERS_BASE_URL}/{state}/{year}/{quarter}.json"
            filename = f"{state}_{year}_Q{quarter}.json"
            filepath = os.path.join(USERS_RAW_DIR, filename)

            r = requests.get(url)

            if r.status_code == 200:
                with open(filepath, "w") as f:
                    json.dump(r.json(), f)
                saved_files += 1
            else:
                failed_requests.append({
                    "state": state,
                    "year": year,
                    "quarter": quarter,
                    "status": r.status_code
                })

            sleep(0.1)  # polite rate limiting


## 5. Inspect JSON Structure

Before flattening, we inspect a sample JSON file to understand its structure.

The dataset contains:

- registeredUsers
- appOpens
- usersByDevice (optional / sometimes null)

This inspection ensures accurate parsing.


In [54]:
files = os.listdir(USERS_RAW_DIR)
print("Total raw users files:", len(files))
print("Sample files:", files[:5])


Total raw users files: 1008
Sample files: ['himachal-pradesh_2023_Q1.json', 'uttarakhand_2022_Q4.json', 'odisha_2024_Q4.json', 'tamil-nadu_2023_Q1.json', 'goa_2022_Q1.json']


In [55]:
sample_file = os.listdir(USERS_RAW_DIR)[0]
sample_file

'himachal-pradesh_2023_Q1.json'

In [56]:
with open(os.path.join(USERS_RAW_DIR, sample_file), "r") as f:
    sample_json = json.load(f)

sample_json.keys()


dict_keys(['success', 'code', 'data', 'responseTimestamp'])

In [57]:
sample_json["data"].keys()

dict_keys(['aggregated', 'usersByDevice'])

In [58]:
sample_json["data"]["aggregated"]

{'registeredUsers': 2594668, 'appOpens': 111702044}

## 6. Convert Parsed Records into DataFrame

After extracting structured user records,
we convert the list into a Pandas DataFrame.

This enables:
- Validation
- Transformation
- Export for warehouse loading


In [60]:
users_core_records = []

for file in os.listdir(USERS_RAW_DIR):
    with open(os.path.join(USERS_RAW_DIR, file), "r") as f:
        data = json.load(f)

    parts = file.replace(".json", "").split("_")
    state = parts[0]
    year = int(parts[1])
    quarter = int(parts[2].replace("Q", ""))

    aggregated = data["data"]["aggregated"]

    users_core_records.append({
        "state": state,
        "year": year,
        "quarter": quarter,
        "registered_users": aggregated.get("registeredUsers"),
        "app_opens": aggregated.get("appOpens")
    })

users_core_df = pd.DataFrame(users_core_records)


## 7. Perform Data Validation Checks

We validate:

- Row count
- Column names
- Data types
- Missing values

This confirms ingestion accuracy before exporting.


In [61]:
users_core_df.shape

(1008, 5)

In [62]:
users_core_df.isna().sum()

state               0
year                0
quarter             0
registered_users    0
app_opens           0
dtype: int64

In [63]:
users_core_df.sort_values(["state", "year", "quarter"]).head()

Unnamed: 0,state,year,quarter,registered_users,app_opens
42,andaman-&-nicobar-islands,2018,1,6740,0
605,andaman-&-nicobar-islands,2018,2,9405,0
216,andaman-&-nicobar-islands,2018,3,12149,0
779,andaman-&-nicobar-islands,2018,4,15222,0
856,andaman-&-nicobar-islands,2019,1,18596,0


## 8. Feature Engineering

In this step, we enhance the dataset by:

- Adding region classification for each state  
- Structuring columns properly  
- Sorting data by state, year, and quarter  

These improvements prepare the dataset for SQL warehouse loading.


In [64]:
state_to_region = {
    # North
    "jammu-&-kashmir": "North",
    "himachal-pradesh": "North",
    "punjab": "North",
    "haryana": "North",
    "uttarakhand": "North",
    "uttar-pradesh": "North",
    "delhi": "North",
    "chandigarh": "North",

    # South
    "andhra-pradesh": "South",
    "telangana": "South",
    "karnataka": "South",
    "tamil-nadu": "South",
    "kerala": "South",
    "puducherry": "South",
    "lakshadweep": "South",

    # West
    "maharashtra": "West",
    "gujarat": "West",
    "rajasthan": "West",
    "goa": "West",
    "dadra-&-nagar-haveli-&-daman-&-diu": "West",

    # East
    "west-bengal": "East",
    "odisha": "East",
    "bihar": "East",
    "jharkhand": "East",

    # Central
    "madhya-pradesh": "Central",
    "chhattisgarh": "Central",

    # North-East
    "assam": "North-East",
    "meghalaya": "North-East",
    "manipur": "North-East",
    "mizoram": "North-East",
    "nagaland": "North-East",
    "tripura": "North-East",
    "arunachal-pradesh": "North-East",
    "sikkim": "North-East",

    # UTs
    "andaman-&-nicobar-islands": "North-East",
    "ladakh": "North",
}


In [65]:
users_core_df["region"] = users_core_df["state"].map(state_to_region)

In [66]:
users_core_df["region"].isna().sum()

np.int64(0)

In [67]:
users_core_df = users_core_df[
    ["state", "region", "year", "quarter", "registered_users", "app_opens"]
]


In [68]:
users_core_df.groupby("year")["quarter"].nunique()

year
2018    4
2019    4
2020    4
2021    4
2022    4
2023    4
2024    4
Name: quarter, dtype: int64

In [69]:
users_core_df = (
    users_core_df
    .sort_values(["state", "year", "quarter"])
    .reset_index(drop=True)
)


In [70]:
users_core_df.shape

(1008, 6)

In [71]:
users_core_df.isna().sum()

state               0
region              0
year                0
quarter             0
registered_users    0
app_opens           0
dtype: int64

In [72]:
users_core_df.head()

Unnamed: 0,state,region,year,quarter,registered_users,app_opens
0,andaman-&-nicobar-islands,North-East,2018,1,6740,0
1,andaman-&-nicobar-islands,North-East,2018,2,9405,0
2,andaman-&-nicobar-islands,North-East,2018,3,12149,0
3,andaman-&-nicobar-islands,North-East,2018,4,15222,0
4,andaman-&-nicobar-islands,North-East,2019,1,18596,0


In [73]:
users_core_df.tail()

Unnamed: 0,state,region,year,quarter,registered_users,app_opens
1003,west-bengal,East,2023,4,30064546,1467442959
1004,west-bengal,East,2024,1,31306843,633526507
1005,west-bengal,East,2024,2,32540397,704276274
1006,west-bengal,East,2024,3,33612828,709864323
1007,west-bengal,East,2024,4,34750181,1994940763


## 9. Export Structured Users Dataset

The cleaned user dataset is exported as CSV.

This file will serve as input for:

- SQL warehouse loading
- Adoption analysis
- Engagement intensity metrics


In [74]:
users_core_df.to_csv(
    "/kaggle/working/phonepe_users_aggregated.csv",
    index=False
)

print("Users Core CSV saved (without region enrichment)")


Users Core CSV saved (without region enrichment)


## Conclusion

This notebook successfully:

- Retrieved quarterly user data
- Extracted core adoption metrics
- Parsed device-level distribution
- Handled null scenarios safely
- Generated warehouse-ready CSV

This completes the user ingestion phase of the project.
