# Clone the Data from Github

In [None]:
import zipfile
import requests
from io import BytesIO

url = "https://github.com/PhonePe/pulse/archive/refs/heads/main.zip"
response = requests.get(url)

with zipfile.ZipFile(BytesIO(response.content)) as zip_ref:
    zip_ref.extractall("your directory")


# Explore the Data

In [None]:
import json

with open("C:\\Users\\rahul\\Data Science\\pulse-master\\data\\map\\insurance\\country\\india\\state\\jharkhand\\2024\\1.json", "r") as f:
    data = json.load(f)

# Explore it
print(data)

# Convert the json Data to csv 

In [None]:
from pathlib import Path
import json
import pandas as pd

Base_path = Path(r"C:\Users\rahul\Data Science\pulse-master\data")
OUTPUT_DIR = Path(r"C:\Users\rahul\Data Science\pulse-master\csv_output")
OUTPUT_DIR.mkdir(exist_ok=True) 

# Containers for each CSV
agg_transactions = []
agg_users = []
agg_insurance = []
map_transactions = []
map_users = []
map_insurance_hover = []
map_insurance_country = []
top_transactions = []
top_users = []
top_insurance = []

def extract_meta (parts):
    """Extract year, quarter, location, level from file path parts."""
    section = parts[0] #agrregated/map/top
    subtype = parts[1] #transcation/user/top
    level = parts[-4]  #country/state
    location = parts[-3] #india/state name
    year =  parts[-2] # it return the year e.g 2018 or else none
    quarter = parts[-1].replace(".json","") #we are repacing the extension of json to text

    return section,subtype,level,location,year,quarter


#walk trough each json file
for json_file in Base_path.rglob("*.json"):
    try:
        with open(json_file,"r",encoding = "utf-8") as f:
            data = json.load(f)

    except json.JSONDecodeError:
        print(f"❌ could not read {json_file}")
        continue

    parts = json_file.relative_to(Base_path).parts
    section,subtype,level,location,year,quarter = extract_meta(parts)

     # === Aggregated ===
    if section == "aggregated":
        if subtype == "transaction":
            for entry in data["data"].get("transactionData",[]):
                for pi in entry.get("paymentInstruments",[]):
                    agg_transactions.append({
                        "year":year,
                        "quarter": quarter,
                        "country_state_level": level,
                        "location": location,
                        "category": entry.get("name"),
                        "transaction_count": pi.get("count"),
                        "transaction_amount": pi.get("amount")
                    })
        
        elif subtype == "user":
            aggregated = data["data"].get("aggregated",[])
            devices = data["data"].get("usersByDevice",[])
            # Case 1: If devices is a list -> loop normally
            if isinstance(devices,list):
                for device in devices:
                    agg_users.append({
                        "year":year,
                        "quarter": quarter,
                        "country_state_level": level,
                        "location": location,
                        "registered_users": aggregated.get("registeredUsers"),
                        "app_opens": aggregated.get("appOpens"),
                        "brand": device.get("brand"),
                        "brand_count": device.get("count"),
                        "brand_percentage": device.get("percentage")
                })

            # Case 2: If devices is None -> still save aggregated data (without brand info)
            else:
                 agg_users.append({
                     "year": year,
                     "quarter": quarter,
                     "country_state_level": level,
                     "location": location,
                     "registered_users": aggregated.get("registeredUsers"),
                     "app_opens": aggregated.get("appOpens"),
                     "brand": "Unknown",
                     "brand_count": 0,
                     "brand_percentage": 0
                 })

        elif subtype == "insurance":
            for entry in data["data"].get("transactionData",[]):
                for pi in entry.get("paymentInstruments",[]):
                    agg_insurance.append({
                        "year":year,
                        "quarter": quarter,
                        "country_state_level": level,
                        "location": location,
                        "transaction_count": pi.get("count"),
                        "transaction_amount": pi.get("amount")
                    })

    #  === Map ===
    elif section == "map":
        if subtype == "transaction":
            for entry in data["data"].get("hoverDataList",[]):
                for metric in entry.get("metric",[]):
                    map_transactions.append({
                        "year":year,
                        "quarter": quarter,
                        "country_state_level": level,
                        "location": location,
                        "district": entry.get("name"),
                        "transaction_count": metric.get("count"),
                        "transaction_amount": metric.get("amount")
                    })

        elif subtype == "user":
            for name,vals in data["data"].get("hoverData",{}).items():
                map_users.append({
                    "year": year,
                    "quarter": quarter,
                    "country_state_level": level,
                    "location": location,
                    "registeredUsers": vals.get("registeredUsers"),
                    "app_opens": vals.get("appOpens")
                })

        elif subtype == "insurance":
            # Case 1: lat/lng/metric/label format (columns + data)
            if "columns" in data.get("data", {}).get("data", {}):
                for row in data["data"]["data"].get("data", []):
                    if len(row) == 4:
                        lat, lng, metric, label = row
                        map_insurance_hover.append({
                            "year": year,
                            "quarter": quarter,
                            "country_state_level": level,
                            "location": location,
                            "lat": lat,
                            "lng": lng,
                            "metric": metric,
                            "label": label
                        })

            elif "hoverDataList" in data.get("data", {}):
                for entry in data["data"].get("hoverDataList", []):
                    name = entry.get("name")
                    for metric in entry.get("metric", []):
                         map_insurance_country.append({
                             "year": year,
                             "quarter": quarter,
                             "country_state_level": level,
                             "location": location,
                             "district": name,
                             "transaction_count": metric.get("count"),
                             "transaction_amount": metric.get("amount")
                         })

    # === Top ===
    elif section == "top":
        if subtype == "transaction":
            for group_name in ["states","districts","pincodes"]:
                for entry in data["data"].get(group_name) or []:
                    metric = entry.get("metric",{})
                    top_transactions.append({
                        "year":year,
                        "quarter": quarter,
                        "country_state_level": level,
                        "location": location,
                        "state_dis_pin": group_name[:-1],  # remove plural
                        "state_dis_pin_name": entry.get("entityName"),
                        "transaction_count": metric.get("count"),
                        "transaction_amount": metric.get("amount")
                    })

        elif subtype == "user":
            for group_name in ["states", "districts", "pincodes"]:
                 for entry in data["data"].get(group_name) or []:
                      top_users.append({
                          "year":year,
                          "quarter": quarter,
                          "country_state_level": level,
                          "location": location,
                          "state_dis_pin": group_name[:-1],  # remove plural
                          "state_dis_pin_name": entry.get("name"),
                          "registeredUsers": entry.get("registeredUsers") 

                      })

        elif subtype == "insurance":
            data_dict = data.get("data",{})
            if isinstance(data_dict, dict):
                for group_name in ["states", "districts", "pincodes"]:
                    for entry in data_dict.get(group_name) or []:
                        metric = entry.get("metric", {})
                        top_insurance.append({
                            "year":year,
                            "quarter": quarter,
                            "country_state_level": level,
                            "location": location,
                            "state_dis_pin": group_name[:-1],  # remove plural
                            "state_dis_pin_name": entry.get("entityName"),
                            "transaction_count": metric.get("count"),
                            "transaction_amount": metric.get("amount")
                     })
                     
                         
# === Save all CSVs ===
pd.DataFrame(agg_transactions).to_csv(OUTPUT_DIR / "aggregated_transactions.csv", index=False)
pd.DataFrame(agg_users).to_csv(OUTPUT_DIR / "aggregated_users.csv", index=False)
pd.DataFrame(agg_insurance).to_csv(OUTPUT_DIR / "aggregated_insurance.csv", index=False)
pd.DataFrame(map_transactions).to_csv(OUTPUT_DIR / "map_transactions.csv", index=False)
pd.DataFrame(map_users).to_csv(OUTPUT_DIR / "map_users.csv", index=False)
pd.DataFrame(map_insurance_hover).to_csv(OUTPUT_DIR / "map_insurance_hover.csv", index=False)
pd.DataFrame(map_insurance_country).to_csv(OUTPUT_DIR / "map_insurance_country.csv", index=False)
pd.DataFrame(top_transactions).to_csv(OUTPUT_DIR / "top_transactions.csv", index=False)
pd.DataFrame(top_users).to_csv(OUTPUT_DIR / "top_users.csv", index=False)
pd.DataFrame(top_insurance).to_csv(OUTPUT_DIR / "top_insurance.csv", index=False)

print("✅ All CSV files created in:", OUTPUT_DIR.resolve())
                        

# Create the tables in Sql Database

In [None]:
import mysql.connector

db_config = {
    "host": "localhost",
    "user": "root",
    "password": "root",
    "database": "phonepe_pulse"
}

conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()

query = [
    """CREATE TABLE aggregated_transactions (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        category VARCHAR(50),         -- recharge, p2p, merchant, etc.
        transaction_count BIGINT,
        transaction_amount BIGINT
    );""",
    
    """CREATE TABLE aggregated_insurance (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        transaction_count BIGINT,
        transaction_amount BIGINT
    );""",
    
    """CREATE TABLE aggregated_users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        registered_users BIGINT,
        app_opens BIGINT,
        brand VARCHAR(50),
        brand_count BIGINT,
        brand_percentage DECIMAL(12,9)
    );""",
    
    """CREATE TABLE map_transactions (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        district VARCHAR(100),
        transaction_count BIGINT,
        transaction_amount BIGINT
    );""",
    
    """CREATE TABLE map_users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        registeredUsers BIGINT,
        app_opens BIGINT
    );""",
    
    """CREATE TABLE map_insurance_country (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        district VARCHAR(100),
        transaction_count BIGINT,
        transaction_amount BIGINT
    );""",
    
    """CREATE TABLE map_insurance_hover (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        district VARCHAR(100),
        lat DOUBLE,
        lng DOUBLE,
        metric BIGINT
    );""",
    
    """CREATE TABLE top_transactions (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        state_dis_pin VARCHAR(20),      -- district/pincode
        state_dis_pin_name VARCHAR(100),     -- district name or pincode
        transaction_count BIGINT,
        transaction_amount BIGINT
    );""",
    
    """CREATE TABLE top_insurance (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        state_dis_pin VARCHAR(20),      -- district/pincode
        state_dis_pin_name VARCHAR(100),
        transaction_count BIGINT,
        transaction_amount BIGINT
    );""",
    
    """CREATE TABLE top_users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        year INT,
        quarter INT,
        country_state_level VARCHAR(100),
        location VARCHAR(100),
        state_dis_pin VARCHAR(20),      -- district/pincode
        state_dis_pin_name VARCHAR(100),
        registeredUsers BIGINT
    );"""
]

for q in query:
    cursor.execute(q)

print("✅ Tables created successfully!")

cursor.close()
conn.close()


# To upload the csv data in table, created in sql Database

In [None]:
import mysql.connector
import pandas as pd

def insert_csv_to_mysql(csv_path, table_name, db_config):
    """
    Insert CSV data into a MySQL table safely, handling NULLs correctly.
    """
    try:
        # Connect to MySQL
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()

        # Load CSV
        df = pd.read_csv(csv_path)

        # Convert NaN/NaT to None (MySQL NULL)
        df = df.where(pd.notnull(df), None)

        # Get actual table columns from MySQL
        cursor.execute(f"DESCRIBE {table_name}")
        table_columns = [col[0] for col in cursor.fetchall()]

        # Keep only common columns
        common_cols = [col for col in df.columns if col in table_columns]
        if not common_cols:
            print(f"⚠️ No matching columns for {table_name}. Skipping.")
            return

        df = df[common_cols]

        # Build query dynamically
        columns = ", ".join([f"`{col}`" for col in common_cols])
        placeholders = ", ".join(["%s"] * len(common_cols))
        insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

        # Replace all NaN/NaT with None again (important for executemany)
        data = [tuple(None if pd.isna(x) else x for x in row) for row in df.to_numpy()]

        # Bulk insert
        batch_size = 5000
        for i in range(0, len(data), batch_size):
            cursor.executemany(insert_query, data[i:i+batch_size])

        # Commit
        conn.commit()
        print(f"✅ {len(df)} rows from {csv_path} inserted into {table_name}")

    except Exception as e:
        print(f"❌ Error with {csv_path}: {e}")

    finally:
        cursor.close()
        conn.close()

# ---------- USAGE ----------

db_config = {
    "host": "localhost",
    "user": "root",
    "password": "root",
    "database": "phonepe_pulse"
}

# aggregated_transactions
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/aggregated_transactions.csv",
    table_name="aggregated_transactions",
    db_config=db_config
)

# aggregated_insurance
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/aggregated_insurance.csv",
    table_name="aggregated_insurance",
    db_config=db_config
)

# aggregated_users
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/aggregated_users.csv",
    table_name="aggregated_users",
    db_config=db_config
)
# map_insurance_country
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/map_insurance_country.csv",
    table_name="map_insurance_country",
    db_config=db_config
)

# map_insurance_hover
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/map_insurance_hover.csv",
    table_name="map_insurance_hover",
    db_config=db_config
)

# map_transactions
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/map_transactions.csv",
    table_name="map_transactions",
    db_config=db_config
)

# map_users
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/map_users.csv",
    table_name="map_users",
    db_config=db_config
)

# top_insurance
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/top_insurance.csv",
    table_name="top_insurance",
    db_config=db_config
)

# top_transactions
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/top_transactions.csv",
    table_name="top_transactions",
    db_config=db_config
)

# top_users
insert_csv_to_mysql(
    csv_path="C:/Users/rahul/Data Science/pulse-master/csv_output/top_users.csv",
    table_name="top_users",
    db_config=db_config
)