In [1]:
# Cell 1: #The main address where all the data is stored.

import os              #A tool to understand folder paths (like a GPS).
import glob            #A tool to find all files that match a pattern (like a searchlight).
import json            #A special tool to read the JSON file format.
import pandas as pd    #Our most important tool for creating data tables (like a super-smart spreadsheet).

#The main address where all the data is stored.
DATA_DIR = r"F:\pulse-master\data"

# Here, we define the robot and give it its instructions.
def load_regional(category, data_type, geo_level):
    """
    This function reads all JSON files from a specific path,
    handling the different folder structures we discovered,
    and returns a clean pandas DataFrame.
    """
    path = os.path.join(DATA_DIR, category, data_type)

    # --- THIS IS THE CRUCIAL FIX ---
    # If we are loading map/transaction or map/user data, we add the 'hover' subfolder to the path.
    if category == 'map' and data_type in ['transaction', 'user']:
        path = os.path.join(path, 'hover')

    pattern = os.path.join(path, "country", "india", geo_level, "*", "*", "*.json")

    files = glob.glob(pattern, recursive=True)
    if not files:
        print(f"--> INFO: No files found for {category}/{data_type}/{geo_level}. Skipping.")
        return pd.DataFrame()

    records = []
    for fp in files:
        parts   = fp.split(os.sep)
        idx     = parts.index(geo_level)+1
        region  = parts[idx]
        year    = parts[idx+1]
        quarter = os.path.splitext(parts[-1])[0]

        try:
            with open(fp, 'r') as f:
                obj = json.load(f)
        except json.JSONDecodeError:
            continue # Skip corrupted files

        data = obj.get("data", obj)
        if isinstance(data, dict):
            data = [data]
        if data is None:
            continue

        for entry in data:
            if isinstance(entry, dict):
                entry.update({geo_level: region, "year": int(year), "quarter": int(quarter)})
                records.append(entry)

    df = pd.DataFrame(records)
    if not df.empty:
        df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns]  #This line asks, "Do the columns 'year' AND 'quarter' both exist in my table?" 
        if {"year", "quarter"}.issubset(df.columns):
            q2m = {1: "01", 2: "04", 3: "07", 4: "10"}   #This dictionary is a small tool we'll use to translate the quarter number into a month number
            df["date"] = pd.to_datetime(
                df["year"].astype(str) + "-" + df["quarter"].map(q2m) + "-01"      #This is where the new "date" column is built and added to the table.
            )
    return df

print("Helper function 'load_regional' has been updated and is now ready to use.")

Helper function 'load_regional' has been updated and is now ready to use.


In [2]:
# Cell 2: Create Database and Tables

import mysql.connector

# --- Your Database Credentials ---
db_host = "127.0.0.1"
db_user = "root"
db_pass = "Akshay@200"
db_name = "phonepe_pulse"

try:
    # Connect without a specific database to create it first
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass)
    cursor = con.cursor()
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
    con.database = db_name # Switch to the phonepe_pulse database
    print(f"Database '{db_name}' is ready.")

    # Dictionary holding all table names and their column definitions
    tables = {
        "aggregated_transaction": """(state VARCHAR(100), year INT, quarter INT, transaction_name VARCHAR(100), transaction_count BIGINT, transaction_amount DECIMAL(30, 2), date DATE)""",
        "aggregated_user": """(state VARCHAR(100), year INT, quarter INT, brand VARCHAR(100), brand_count BIGINT, brand_percentage DECIMAL(10, 5), date DATE)""",
        "aggregated_insurance": """(state VARCHAR(100), year INT, quarter INT, insurance_name VARCHAR(100), insurance_count BIGINT, insurance_amount DECIMAL(30, 2), date DATE)""",
        "map_transaction": """(state VARCHAR(100), year INT, quarter INT, district_name VARCHAR(100), transaction_count BIGINT, transaction_amount DECIMAL(30, 2), date DATE)""",
        "map_user": """(state VARCHAR(100), year INT, quarter INT, district_name VARCHAR(100), registered_users BIGINT, app_opens BIGINT, date DATE)""",
        "map_insurance": """(state VARCHAR(100), year INT, quarter INT, district_name VARCHAR(100), insurance_count BIGINT, insurance_amount DECIMAL(30, 2), date DATE)""",
        "top_transaction": """(state VARCHAR(100), year INT, quarter INT, entity_name VARCHAR(100), entity_type VARCHAR(20), transaction_count BIGINT, transaction_amount DECIMAL(30, 2), date DATE)""",
        "top_user": """(state VARCHAR(100), year INT, quarter INT, entity_name VARCHAR(100), entity_type VARCHAR(20), registered_users BIGINT, date DATE)""",
        "top_insurance": """(state VARCHAR(100), year INT, quarter INT, entity_name VARCHAR(100), entity_type VARCHAR(20), insurance_count BIGINT, insurance_amount DECIMAL(30, 2), date DATE)"""
    }

    # Loop through the dictionary to create and clear each table
    for table_name, columns in tables.items():
        cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} {columns};")
        cursor.execute(f"TRUNCATE TABLE {table_name};")

    print("All 9 tables have been successfully created and cleared of old data.")
    con.commit()   #This executes and saves the changes to the database.

except mysql.connector.Error as err:
    print(f"DATABASE SETUP ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()

Database 'phonepe_pulse' is ready.
All 9 tables have been successfully created and cleared of old data.


In [3]:
# Cell 3: Script for aggregated_transaction

import pandas as pd
import mysql.connector


# --- Part A: Load and Process the Data Correctly ---
print("Step 1: Loading and processing data from local files...")
try:
    df = load_regional("aggregated", "transaction", "state")
    processed_rows = []
    
    # This loop now correctly looks inside the 'paymentInstruments' key.
    for index, row in df.iterrows():
        for trans_data in row.get('transactiondata', []):
            payment = trans_data.get('paymentInstruments', [{}])[0]
            processed_rows.append([
                row.get('state'), 
                row.get('year'), 
                row.get('quarter'), 
                trans_data.get('name'), 
                payment.get('count'), 
                payment.get('amount'), 
                row.get('date')
            ])
    final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'transaction_name', 'transaction_count', 'transaction_amount', 'date'])
    print("SUCCESS: Data processed into a DataFrame.")
except Exception as e:
    print(f"ERROR during data processing: {e}")


# --- Part B: Clear the Table and Insert the Correct Data ---
print("\nStep 2: Connecting to database, clearing old data, and inserting new data...")
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    
    # We first clear the table to ensure no old, bad data remains
    cursor.execute("TRUNCATE TABLE aggregated_transaction;")
    
    query = "INSERT INTO aggregated_transaction VALUES (%s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    cursor.executemany(query, data_to_insert)
    con.commit()
    print(f"SUCCESS: {cursor.rowcount} records inserted into the database.")
except mysql.connector.Error as err:
    print(f"ERROR during database insertion: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()



Step 1: Loading and processing data from local files...
SUCCESS: Data processed into a DataFrame.

Step 2: Connecting to database, clearing old data, and inserting new data...
SUCCESS: 5034 records inserted into the database.


In [4]:
# Cell 4: Process & Insert into aggregated_user

print("\n--- Processing: aggregated_user ---")
df = load_regional("aggregated", "user", "state")
processed_rows = []
for index, row in df.iterrows():
    if row.get('usersbydevice'):
        for device_data in row['usersbydevice']:
            processed_rows.append([row.get('state'), row.get('year'), row.get('quarter'), device_data.get('brand'), device_data.get('count'), device_data.get('percentage'), row.get('date')])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'brand', 'brand_count', 'brand_percentage', 'date'])

# Database Insertion
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    query = "INSERT INTO aggregated_user VALUES (%s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    cursor.executemany(query, data_to_insert)
    con.commit()
    print(f"SUCCESS: {cursor.rowcount} records inserted.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()


--- Processing: aggregated_user ---
SUCCESS: 6732 records inserted.


In [5]:
# Cell 5: Process & Insert into aggregated_insurance (Final Corrected Version)

print("\n--- Processing: aggregated_insurance ---")
# We load the data first using our reliable helper function
df = load_regional("aggregated", "insurance", "state")
processed_rows = []

# This loop now uses the correct key 'paymentInstruments' with a capital 'I'
for index, row in df.iterrows():
    for ins_data in row.get('transactiondata', []):
        # --- THIS IS THE FIX ---
        payment = ins_data.get('paymentInstruments', [{}])[0] 
        processed_rows.append([
            row.get('state'), 
            row.get('year'), 
            row.get('quarter'), 
            ins_data.get('name'), 
            payment.get('count'), 
            payment.get('amount'), 
            row.get('date')
        ])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'insurance_name', 'insurance_count', 'insurance_amount', 'date'])

# Database Insertion (This part is correct)
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    
    # We first clear the table to ensure no old, bad data remains
    cursor.execute("TRUNCATE TABLE aggregated_insurance;")
    
    query = "INSERT INTO aggregated_insurance VALUES (%s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    cursor.executemany(query, data_to_insert)
    con.commit()
    print(f"SUCCESS: {cursor.rowcount} records inserted with correct values.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()


--- Processing: aggregated_insurance ---
SUCCESS: 682 records inserted with correct values.


In [6]:
# Cell 6: Process & Insert into map_transaction (Corrected)

print("\n--- Processing: map_transaction ---")
# We load the data first using our reliable function
df = load_regional("map", "transaction", "state")
processed_rows = []

# --- THIS IS THE FIX ---
# The correct column name, as we discovered, is 'hoverdatalist'.
for index, row in df.iterrows():
    for district_data in row.get('hoverdatalist', []): 
        metric = district_data.get('metric', [{}])[0]
        processed_rows.append([
            row.get('state'), 
            row.get('year'), 
            row.get('quarter'), 
            district_data.get('name'), 
            metric.get('count'), 
            metric.get('amount'), 
            row.get('date')
        ])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'district_name', 'transaction_count', 'transaction_amount', 'date']).dropna()

# Database Insertion (This part is correct and uses batching for large data)
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    query = "INSERT INTO map_transaction VALUES (%s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    batch_size = 1000
    total_inserted = 0
    for i in range(0, len(data_to_insert), batch_size):
        batch = data_to_insert[i:i+batch_size]
        if batch:
            cursor.executemany(query, batch)
            con.commit()
            total_inserted += cursor.rowcount
    print(f"SUCCESS: {total_inserted} records inserted.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()


--- Processing: map_transaction ---
SUCCESS: 20604 records inserted.


In [7]:
# Cell 7: Process & Insert into map_user

print("\n--- Processing: map_user ---")
df = load_regional("map", "user", "state")
processed_rows = []
for index, row in df.iterrows():
    for district_name, metrics in row.get('hoverdata', {}).items():
        processed_rows.append([row.get('state'), row.get('year'), row.get('quarter'), district_name, metrics.get('registeredUsers'), metrics.get('appOpens'), row.get('date')])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'district_name', 'registered_users', 'app_opens', 'date']).dropna()

# Database Insertion
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    query = "INSERT INTO map_user VALUES (%s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    batch_size = 1000
    total_inserted = 0
    for i in range(0, len(data_to_insert), batch_size):
        batch = data_to_insert[i:i+batch_size]
        if batch:
            cursor.executemany(query, batch)
            con.commit()
            total_inserted += cursor.rowcount
    print(f"SUCCESS: {total_inserted} records inserted.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()


--- Processing: map_user ---
SUCCESS: 20608 records inserted.


In [8]:
# Cell 8: Process & Insert into map_insurance

print("\n--- Processing: map_insurance ---")
df = load_regional("map", "insurance", "state")
processed_rows = []
for index, row in df.iterrows():
    for district_list in row.get('data', {}).get('data', []):
        if len(district_list) > 3:
            processed_rows.append([row.get('state'), row.get('year'), row.get('quarter'), district_list[3], district_list[2], 0, row.get('date')])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'district_name', 'insurance_count', 'insurance_amount', 'date']).dropna()

# Database Insertion
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    query = "INSERT INTO map_insurance VALUES (%s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    batch_size = 1000
    total_inserted = 0
    for i in range(0, len(data_to_insert), batch_size):
        batch = data_to_insert[i:i+batch_size]
        if batch:
            cursor.executemany(query, batch)
            con.commit()
            total_inserted += cursor.rowcount
    print(f"SUCCESS: {total_inserted} records inserted.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()


--- Processing: map_insurance ---
SUCCESS: 1043137 records inserted.


In [9]:
# Cell 9: Process & Insert into top_transaction (Final Corrected Version)

print("\n--- Processing: top_transaction ---")
# We load the data first using our reliable helper function
df = load_regional("top", "transaction", "state")
processed_rows = []

# This loop now uses the correct key 'entityName'
for index, row in df.iterrows():
    # Process the list of top districts
    for district in row.get('districts', []):
        metric = district.get('metric', {})
        processed_rows.append([
            row.get('state'), 
            row.get('year'), 
            row.get('quarter'), 
            district.get('entityName'), # Correct key
            'district', 
            metric.get('count'), 
            metric.get('amount'), 
            row.get('date') # This is the complete line
        ])
    # Process the list of top pincodes
    for pincode in row.get('pincodes', []):
        metric = pincode.get('metric', {})
        processed_rows.append([
            row.get('state'), 
            row.get('year'), 
            row.get('quarter'), 
            pincode.get('entityName'), # Correct key
            'pincode', 
            metric.get('count'), 
            metric.get('amount'), 
            row.get('date') # This is the complete line
        ])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'entity_name', 'entity_type', 'transaction_count', 'transaction_amount', 'date']).dropna()

# Database Insertion (This part is correct and uses batching)
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    query = "INSERT INTO top_transaction VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    batch_size = 1000
    total_inserted = 0
    for i in range(0, len(data_to_insert), batch_size):
        batch = data_to_insert[i:i+batch_size]
        if batch:
            cursor.executemany(query, batch)
            con.commit()
            total_inserted += cursor.rowcount
    print(f"SUCCESS: {total_inserted} records inserted.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()


--- Processing: top_transaction ---
SUCCESS: 18293 records inserted.


In [10]:
# Cell 10: Process & Insert into top_user (Corrected)

print("\n--- Processing: top_user ---")
# We load the data first using our reliable helper function
df = load_regional("top", "user", "state")
processed_rows = []

# This loop now uses the correct key 'name' for both districts and pincodes
for index, row in df.iterrows():
    # Process the list of top districts
    for district in row.get('districts', []):
        processed_rows.append([
            row.get('state'),
            row.get('year'),
            row.get('quarter'),
            district.get('name'), # <-- THE FIX
            'district',
            district.get('registeredUsers'),
            row.get('date')
        ])
    # Process the list of top pincodes
    for pincode in row.get('pincodes', []):
        processed_rows.append([
            row.get('state'),
            row.get('year'),
            row.get('quarter'),
            pincode.get('name'), # <-- THE FIX
            'pincode',
            pincode.get('registeredUsers'),
            row.get('date')
        ])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'entity_name', 'entity_type', 'registered_users', 'date']).dropna()

# Database Insertion (This part is correct and uses batching)
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    query = "INSERT INTO top_user VALUES (%s, %s, %s, %s, %s, %s, %s)"
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    batch_size = 1000
    total_inserted = 0
    for i in range(0, len(data_to_insert), batch_size):
        batch = data_to_insert[i:i+batch_size]
        if batch:
            cursor.executemany(query, batch)
            con.commit()
            total_inserted += cursor.rowcount
    print(f"SUCCESS: {total_inserted} records inserted.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con


--- Processing: top_user ---
SUCCESS: 18296 records inserted.


In [11]:
# Cell 11: Process & Insert into top_insurance (Final Corrected Version)

print("\n--- Processing: top_insurance ---")
# We load the data first using our reliable helper function
df = load_regional("top", "insurance", "state")
processed_rows = []

# This loop now uses the correct key 'entityName'
for index, row in df.iterrows():
    # Process the list of top districts
    for district in row.get('districts', []):
        metric = district.get('metric', {})
        processed_rows.append([
            row.get('state'),
            row.get('year'),
            row.get('quarter'),
            district.get('entityName'), # Correct key
            'district',
            metric.get('count'),
            metric.get('amount'),
            row.get('date')
        ])
    # Process the list of top pincodes
    for pincode in row.get('pincodes', []):
        metric = pincode.get('metric', {})
        processed_rows.append([
            row.get('state'),
            row.get('year'),
            row.get('quarter'),
            pincode.get('entityName'), # Correct key
            'pincode',
            metric.get('count'),
            metric.get('amount'),
            row.get('date')
        ])

final_df = pd.DataFrame(processed_rows, columns=['state', 'year', 'quarter', 'entity_name', 'entity_type', 'insurance_count', 'insurance_amount', 'date']).dropna()

# Database Insertion (This part is correct and uses batching)
try:
    con = mysql.connector.connect(host=db_host, user=db_user, password=db_pass, database=db_name)
    cursor = con.cursor()
    
    # --- THIS IS THE COMPLETE AND CORRECT LINE ---
    query = "INSERT INTO top_insurance VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
    
    data_to_insert = [tuple(row) for row in final_df.to_numpy()]
    batch_size = 1000
    total_inserted = 0
    for i in range(0, len(data_to_insert), batch_size):
        batch = data_to_insert[i:i+batch_size]
        if batch:
            cursor.executemany(query, batch)
            con.commit()
            total_inserted += cursor.rowcount
            
    print(f"SUCCESS: {total_inserted} records inserted.")
except mysql.connector.Error as err:
    print(f"ERROR: {err}")
finally:
    if 'con' in locals() and con.is_connected():
        cursor.close()
        con.close()


--- Processing: top_insurance ---
SUCCESS: 12273 records inserted.
