In [1]:
from git import Repo
import pandas as pd
import json
import os
import mysql.connector as msql 
from mysql.connector import Error
import sys

## DATA CLONE

In [3]:
def data_clone():
    repo_url = "https://github.com/PhonePe/pulse.git"

    repo_name = os.path.basename(repo_url).removesuffix(".git")
    clone_path = os.path.join(os.getcwd(),repo_name)

    if not os.path.exists(clone_path):
        Repo.clone_from(repo_url, clone_path)
        print(f"Data Cloned at {clone_path}")
    else:
        print(f"Data already cloned at {clone_path}")

In [4]:
data_clone()

Data already cloned at C:\Users\snega\OneDrive\Desktop\Data_Science\Phonpe\pulse


## DATA EXTRACT AND TRANSFORM

In [6]:
root_state_dir = []
def rename_directories(path):
    for root, dirs, files in os.walk(path):
        # Rename state directories according to geojson file
        if os.path.basename(root) == 'state':
            for state_name in dirs:
                old_path =  os.path.join(root, state_name)
                state_name.title().replace('-', ' ')
                if state_name.startswith('Andaman'):
                    state_name = 'Andaman & Nicobar'
                elif state_name.startswith('Jammu'):
                    state_name = 'Jammu & Kashmir'
                elif state_name.startswith('Dadra'):
                    state_name = 'Dadra and Nagar Haveli and Daman and Diu'  
                new_path = os.path.join(root, state_name)
                os.rename(old_path, new_path)
                   
        # To get all state basepath
        if os.path.basename(root) == 'state' and root not in root_state_dir:
            root_state_dir.append(root.replace('\\','/'))

In [7]:
rename_directories(os.getcwd())
print("All State Directories renamed successfully. \n\nBase state directory paths:")
for root in root_state_dir:
    print(root)

All State Directories renamed successfully. 

Base state directory paths:
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/aggregated/insurance/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/aggregated/transaction/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/aggregated/user/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/insurance/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/insurance/hover/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/transaction/hover/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/user/hover/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/top/insurance/country/india/state
C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/top/transaction/country/india/state
C:/Users/snega/One

In [8]:
class data_extriform:
    def __init__(self):
        pass

    # Aggregated_user: Holds aggregated user-related data
    def aggregated_user(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/aggregated/user/country/india/state"
        state_list = os.listdir(state_path)
        
        aggr_user_dict = {"State" : [], "Year" : [], "Quarter" : [],
                          "Brand" : [], "User_Count" : [], "User_Percentage" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    f = open(json_file_path, "r")
                    df = json.load(f)

                    try:
                        for user_data in df['data']['usersByDevice']:
                            brand = user_data['brand']
                            count = user_data['count']
                            percent = user_data['percentage']
                            
                            # Append to aggr_user_dict
                            aggr_user_dict["State"].append(state)
                            aggr_user_dict["Year"].append(year)
                            aggr_user_dict["Quarter"].append('Q'+quarter[0])
                            aggr_user_dict["Brand"].append(brand)
                            aggr_user_dict["User_Count"].append(count)
                            aggr_user_dict["User_Percentage"].append(percent)
                    except:
                        pass
        aggr_user_df = pd.DataFrame(aggr_user_dict)
        aggr_user_df.to_csv("Pulse_Transformed/aggregated_user.csv",index=False)
        return aggr_user_df
    
    # Aggregated_transaction : Contains aggregated values for map-related data.
    def aggregated_transaction(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/aggregated/transaction/country/india/state"
        state_list = os.listdir(state_path)

        aggr_trans_dict = {"State" : [], "Year" : [], "Quarter" : [],
                           "Transaction_Type" : [], "Transaction_Count" : [], "Transaction_Amount" :[]}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    f = open(json_file_path, "r")
                    df = json.load(f)

                    try:
                        for transaction_data in df['data']['transactionData']:
                            name = transaction_data['name']
                            count = transaction_data['paymentInstruments'][0]['count']
                            amount = transaction_data['paymentInstruments'][0]['amount']
                            
                            # Append to aggr_trans_dict
                            aggr_trans_dict["State"].append(state)
                            aggr_trans_dict["Year"].append(year)
                            aggr_trans_dict["Quarter"].append('Q'+quarter[0])
                            aggr_trans_dict["Transaction_Type"].append(name)
                            aggr_trans_dict["Transaction_Count"].append(count)
                            aggr_trans_dict["Transaction_Amount"].append(amount)
                    except:
                        pass
        aggr_trans_df = pd.DataFrame(aggr_trans_dict)
        aggr_trans_df.to_csv("Pulse_Transformed/aggregated_transaction.csv", index=False)
        return aggr_trans_df

    # Aggregated_insurance: Stores aggregated insurance-related data.
    def aggregated_insurance(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/aggregated/insurance/country/india/state"
        state_list = os.listdir(state_path)

        aggr_ins_dict = {"State" : [], "Year" : [], "Quarter" : [],
                         "Type" : [], "Insurance_Count" : [], "Insurance_Amount" : []}

        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    f = open(json_file_path, "r")
                    df = json.load(f)
                    
                    try:
                        for ins_data in df['data']['transactionData']:
                            types = ins_data['name']
                            count = ins_data['paymentInstruments'][0]['count']
                            amount = ins_data['paymentInstruments'][0]['amount']
    
                            # Append to aggr_ins_dict
                            aggr_ins_dict["State"].append(state)
                            aggr_ins_dict["Year"].append(year)
                            aggr_ins_dict["Quarter"].append('Q'+quarter[0])
                            aggr_ins_dict["Type"].append(types)
                            aggr_ins_dict["Insurance_Count"].append(count)
                            aggr_ins_dict["Insurance_Amount"].append(amount)
                    except:
                        pass
        aggr_ins_df = pd.DataFrame(aggr_ins_dict)
        aggr_ins_df.to_csv("Pulse_Transformed/aggregated_insurance.csv", index=False)
        return aggr_ins_df
                    
            
    # Map_user: Contains mapping information for users.
    def map_user(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/user/hover/country/india/state"
        state_list = os.listdir(state_path)

        map_user_dict = {"State" : [], "Year" : [], "Quarter" : [],
                         "District" : [], "Registered_Users" : [], "AppOpen_Count" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    f = open(json_file_path, "r")
                    df = json.load(f)

                    try:
                        for district_key, user_data_value in df['data']['hoverData'].items():
                            district = district_key.title().replace(' District', '')
                            users = user_data_value['registeredUsers']
                            app_count = user_data_value['appOpens']

                            # Append to map_user_dict
                            map_user_dict["State"].append(state)
                            map_user_dict["Year"].append(year)
                            map_user_dict["Quarter"].append('Q'+quarter[0])
                            map_user_dict["District"].append(district)
                            map_user_dict["Registered_Users"].append(users)
                            map_user_dict["AppOpen_Count"].append(app_count)
                    except:
                        pass
        map_user_df = pd.DataFrame(map_user_dict)
        map_user_df.to_csv("Pulse_Transformed/map_user.csv", index=False)
        return map_user_df
    
    # Map_map: Holds mapping values for total amounts at state and district levels.
    def map_transaction(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/transaction/hover/country/india/state"
        state_list = os.listdir(state_path)

        map_trans_dict = {"State" : [], "Year" : [], "Quarter" : [],
                           "District" : [], "Transaction_Count" : [], "Transaction_Amount" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    f = open(json_file_path, "r")
                    df = json.load(f)

                    try:
                        for trans_value in df['data']['hoverDataList']:
                            district = trans_value['name'].title().replace(' District', '')
                            count = trans_value['metric'][0]['count']
                            amount = trans_value['metric'][0]['amount']

                            # Append to map_trans_dict
                            map_trans_dict["State"].append(state)
                            map_trans_dict["Year"].append(year)
                            map_trans_dict["Quarter"].append('Q'+quarter[0])
                            map_trans_dict["District"].append(district)
                            map_trans_dict["Transaction_Count"].append(count)
                            map_trans_dict["Transaction_Amount"].append(amount)
                    except:
                        pass
        map_trans_df = pd.DataFrame(map_trans_dict)
        map_trans_df.to_csv("Pulse_Transformed/map_transaction.csv", index=False)
        return map_trans_df


    # Map_insurance: Includes mapping information related to insurance.
    def map_insurance(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/insurance/hover/country/india/state"
        state_list = os.listdir(state_path)

        map_ins_dict = {"State" : [], "Year" : [], "Quarter" : [],
                         "District" : [], "Insurance_Count" : [], "Insurance_Amount" : []}

        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    df = pd.read_json(json_file_path)
                    
                    try:
                        for ins_data in df['data']['hoverDataList']:
                            district = ins_data['name'].title().replace(' District', '')
                            count = ins_data['metric'][0]['count']
                            amount = ins_data['metric'][0]['amount']
    
                            # Append to map_ins_dict
                            map_ins_dict["State"].append(state)
                            map_ins_dict["Year"].append(year)
                            map_ins_dict["Quarter"].append('Q'+quarter[0])
                            map_ins_dict["District"].append(district)
                            map_ins_dict["Insurance_Count"].append(count)
                            map_ins_dict["Insurance_Amount"].append(amount)
                    except:
                        pass
        map_ins_df = pd.DataFrame(map_ins_dict)
        map_ins_df.to_csv("Pulse_Transformed/map_insurance.csv", index=False)
        return map_ins_df

    # Top_user: Lists totals for the top users.
    def top_user_district(self):
        state_path = "pulse/data/top/user/country/india/state"
        state_list = os.listdir(state_path)

        top_user_district_dict = {"State" : [], "Year" : [], "Quarter" : [],
                         "District" : [], "Registered_Users" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    df = pd.read_json(json_file_path)

                    try:
                        for top_users in df['data']['districts']:
                            district = top_users['name'].title().replace(' District', '')
                            count = top_users['registeredUsers']

                            # Append to top_user_dict
                            top_user_district_dict["State"].append(state)
                            top_user_district_dict["Year"].append(year)
                            top_user_district_dict["Quarter"].append('Q'+quarter[0])
                            top_user_district_dict["District"].append(district)
                            top_user_district_dict["Registered_Users"].append(count)
                    except:
                        pass
        top_user_district_df = pd.DataFrame(top_user_district_dict)
        top_user_district_df.to_csv("Pulse_Transformed/top_user_district.csv", index=False)
        return top_user_district_df
    
    def top_user_pincode(self):
        state_path = "pulse/data/top/user/country/india/state"
        state_list = os.listdir(state_path)

        top_user_pincode_dict = {"State" : [], "Year" : [], "Quarter" : [],
                         "Pincode" : [], "Registered_Users" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    df = pd.read_json(json_file_path)

                    try:
                        for top_users in df['data']['pincodes']:
                            code = top_users['name']
                            count = top_users['registeredUsers']

                            # Append to top_user_pincode_dict
                            top_user_pincode_dict["State"].append(state)
                            top_user_pincode_dict["Year"].append(year)
                            top_user_pincode_dict["Quarter"].append('Q'+quarter[0])
                            top_user_pincode_dict["Pincode"].append(code)
                            top_user_pincode_dict["Registered_Users"].append(count)
                    except:
                        pass
        top_user_pincode_df = pd.DataFrame(top_user_pincode_dict)
        top_user_pincode_df.to_csv("Pulse_Transformed/top_user_pincode.csv", index=False)
        return top_user_pincode_df
    
    # Top_map: Contains totals for the top states, districts, and pin codes.
    def top_transaction_district(self):
        state_path = "pulse/data/top/transaction/country/india/state"
        state_list = os.listdir(state_path)

        top_transaction_district_dict = {"State" : [], "Year" : [], "Quarter" : [],
                                        "District" : [], "Transaction_Count" : [], "Transaction_Amount" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    df = pd.read_json(json_file_path) 

                    try:
                        for top_trans in df['data']['districts']: 
                            district = top_trans['entityName'].title().replace(' District', '')
                            count = top_trans['metric']['count']
                            amount = top_trans['metric']['amount']

                            # Append to top_trans_district_dict
                            top_transaction_district_dict["State"].append(state)
                            top_transaction_district_dict["Year"].append(year)
                            top_transaction_district_dict["Quarter"].append('Q'+quarter[0])
                            top_transaction_district_dict["District"].append(district)
                            top_transaction_district_dict["Transaction_Count"].append(count)
                            top_transaction_district_dict["Transaction_Amount"].append(amount)
                    except:
                        pass
        top_transaction_district_df = pd.DataFrame(top_transaction_district_dict)
        top_transaction_district_df.to_csv("Pulse_Transformed/top_transaction_district.csv", index=False)
        return top_transaction_district_df
    
    def top_transaction_pincode(self):
        state_path = "pulse/data/top/transaction/country/india/state"
        state_list = os.listdir(state_path)

        top_transaction_pincode_dict = {"State" : [], "Year" : [], "Quarter" : [],
                                        "Pincode" : [], "Transaction_Count" : [], "Transaction_Amount" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    df = pd.read_json(json_file_path) 

                    try:
                        for top_trans in df['data']['pincodes']: 
                            pincode = top_trans['entityName']
                            count = top_trans['metric']['count']
                            amount = top_trans['metric']['amount']

                            # Append to top_trans_district_dict
                            top_transaction_pincode_dict["State"].append(state)
                            top_transaction_pincode_dict["Year"].append(year)
                            top_transaction_pincode_dict["Quarter"].append('Q'+quarter[0])
                            top_transaction_pincode_dict["Pincode"].append(pincode)
                            top_transaction_pincode_dict["Transaction_Count"].append(count)
                            top_transaction_pincode_dict["Transaction_Amount"].append(amount)
                    except:
                        pass
        top_transaction_pincode_df = pd.DataFrame(top_transaction_pincode_dict)
        top_transaction_pincode_df.to_csv("Pulse_Transformed/top_transaction_pincode.csv", index=False)
        return top_transaction_pincode_df

    # Top_insurance: Lists totals for the top insurance categories
    def top_insurance_district(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/top/insurance/country/india/state"
        state_list = os.listdir(state_path)

        top_insurance_district_dict = {"State" : [], "Year" : [], "Quarter" : [],
                                        "District" : [], "Insurance_Count" : [], "Insurance_Amount" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    df = pd.read_json(json_file_path) 

                    try:
                        for top_ins in df['data']['districts']: 
                            district = top_ins['entityName'].title().replace(' District', '')
                            count = top_ins['metric']['count']
                            amount = top_ins['metric']['amount']

                            # Append to top_trans_district_dict
                            top_insurance_district_dict["State"].append(state)
                            top_insurance_district_dict["Year"].append(year)
                            top_insurance_district_dict["Quarter"].append('Q'+quarter[0])
                            top_insurance_district_dict["District"].append(district)
                            top_insurance_district_dict["Insurance_Count"].append(count)
                            top_insurance_district_dict["Insurance_Amount"].append(amount)
                    except:
                        pass
        top_insurance_district_df = pd.DataFrame(top_insurance_district_dict)
        top_insurance_district_df.to_csv("Pulse_Transformed/top_insurance_district.csv", index=False)
        return top_insurance_district_df

    def top_insurance_pincode(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/top/insurance/country/india/state"
        state_list = os.listdir(state_path)

        top_insurance_pincode_dict = {"State" : [], "Year" : [], "Quarter" : [],
                                      "Pincode" : [], "Insurance_Count" : [], "Insurance_Amount" : []}
        
        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)

            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    df = pd.read_json(json_file_path) 

                    try:
                        for top_ins in df['data']['pincodes']: 
                            pincode = top_ins['entityName']
                            count = top_ins['metric']['count']
                            amount = top_ins['metric']['amount']

                            # Append to top_trans_district_dict
                            top_insurance_pincode_dict["State"].append(state)
                            top_insurance_pincode_dict["Year"].append(year)
                            top_insurance_pincode_dict["Quarter"].append('Q'+quarter[0])
                            top_insurance_pincode_dict["Pincode"].append(pincode)
                            top_insurance_pincode_dict["Insurance_Count"].append(count)
                            top_insurance_pincode_dict["Insurance_Amount"].append(amount)
                    except:
                        pass
        top_insurance_pincode_df = pd.DataFrame(top_insurance_pincode_dict)
        top_insurance_pincode_df.to_csv("Pulse_Transformed/top_insurance_pincode.csv", index=False)
        return top_insurance_pincode_df

    # Latitude and Longitude Map
    def lat_long_map_statelevel(self):
        state_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/insurance/country/india/state"
        state_list = os.listdir(state_path)

        lat_long_state_map_dict = {"State" : [], "District" : [], "Latitude" : [], "Longitude" : [], "Metric" : []}

        for state in state_list:
            year_path = os.path.join(state_path, state).replace('\\', '/')
            year_list = os.listdir(year_path)
            
            for year in year_list:
                quarter_path = os.path.join(year_path, year).replace('\\', '/')
                quarter_list = os.listdir(quarter_path)

                for quarter in quarter_list:
                    json_file_path = os.path.join(quarter_path, quarter).replace('\\', '/')
                    f = open(json_file_path, "r")
                    df = json.load(f)

                    try:
                        for loc in df['data']['data']['data']:    # Reading via JSON
                            lat, long, metric, label = loc

                            # Append to lat_long_state_map_dict
                            lat_long_state_map_dict["State"].append(state)
                            lat_long_state_map_dict["District"].append(label.title().replace(' District', ''))
                            lat_long_state_map_dict["Latitude"].append(lat)
                            lat_long_state_map_dict["Longitude"].append(long)
                            lat_long_state_map_dict["Metric"].append(metric)
                    except:
                        pass
        lat_long_state_map_df = pd.DataFrame(lat_long_state_map_dict)
        lat_long_state_map_df.to_csv("Pulse_Transformed/lat_long_state_map.csv", index=False)
        return lat_long_state_map_df
            
    def lat_long_map_countrylevel(self):
        country_path = "C:/Users/snega/OneDrive/Desktop/Data_Science/Phonpe/pulse/data/map/insurance/country/india"
        year_list = os.listdir(country_path)
     
        lat_long_india_map_dict = {"State" : [], "Latitude" : [], "Longitude" : [], "Metric" : []}
    
        for year in year_list:
            year_path = os.path.join(country_path, year).replace('\\', '/')
            if os.path.basename(year_path) != "state":
                quarter_list = os.listdir(year_path)
    
                for quarter in quarter_list:
                    json_file_path = os.path.join(year_path, quarter).replace('\\', '/')
                    f = open(json_file_path, "r")
                    df = json.load(f)
    
                    try:
                        for loc in df['data']['data']['data']:    # Reading via JSON
                            lat, long, metric, label = loc
                            
                            # Append to lat_long_map_dict
                            lat_long_india_map_dict["State"].append(label.title().replace('-', ' ').replace('&', 'and'))
                            lat_long_india_map_dict["Latitude"].append(lat)
                            lat_long_india_map_dict["Longitude"].append(long)
                            lat_long_india_map_dict["Metric"].append(metric)
                    except:
                        pass
    
        lat_long_india_map_df = pd.DataFrame(lat_long_india_map_dict)
        lat_long_india_map_df.to_csv("Pulse_Transformed/lat_long_india_map.csv", index=False)
        return lat_long_india_map_df 

In [9]:
print("Data Extract and Transform")
aggr_user_df = data_extriform().aggregated_user()
aggr_trans_df = data_extriform().aggregated_transaction()
aggr_ins_df = data_extriform().aggregated_insurance()
map_user_df = data_extriform().map_user()
map_trans_df = data_extriform().map_transaction()
map_ins_df = data_extriform().map_insurance()
top_user_districtwise_df = data_extriform().top_user_district()
top_user_pincodewise_df = data_extriform().top_user_pincode()
top_trans_districtwise_df = data_extriform().top_transaction_district()
top_trans_pincodewise_df = data_extriform().top_transaction_pincode()
top_ins_districtwise_df = data_extriform().top_insurance_district()
top_ins_pincodewise_df = data_extriform().top_insurance_pincode()
lat_long_state_df = data_extriform().lat_long_map_statelevel()
lat_long_india_df = data_extriform().lat_long_map_countrylevel()
print("JSON to DataFrame and CSV Files converted successfully")

Data Extract and Transform
JSON to DataFrame and CSV Files converted successfully


## DATA LOAD

In [11]:
class load_database:
    def __init__(self):
        pass
    def sql_table_creation(self):
        print("PHONEPE PULSE DB AND TABLE CREATION")
        try:
            conn = msql.connect(host="localhost", user="root", password="root", allow_local_infile=True)
            cursor = conn.cursor()
            print("* MYSQL Connection established")

            # DATABASE CREATION
            cursor.execute("CREATE SCHEMA IF NOT EXISTS project_phonepe_pulse")
            cursor.execute("USE project_phonepe_pulse")
            print("* Selected phonpe DB")

            # TABLE CREATION

            query = """
                    CREATE TABLE IF NOT EXISTS Aggregated_user(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                brand VARCHAR(100),
                                                                user_count INT,
                                                                user_percentage FLOAT);
                    CREATE TABLE IF NOT EXISTS Aggregated_transaction(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                transaction_type VARCHAR(100),
                                                                transaction_count INT UNSIGNED,
                                                                transaction_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS Aggregated_insurance(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                type VARCHAR(100),
                                                                insurance_count INT UNSIGNED,
                                                                insurance_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS Map_user(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                district VARCHAR(100),
                                                                registered_users INT,
                                                                appopen_count INT);
                    CREATE TABLE IF NOT EXISTS map_transaction(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                district VARCHAR(100),
                                                                transaction_count INT,
                                                                transaction_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS Map_insurance(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                district VARCHAR(100),
                                                                insurance_count INT UNSIGNED,
                                                                insurance_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS Top_user_districtwise(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                district VARCHAR(100),
                                                                registered_users INT);
                    CREATE TABLE IF NOT EXISTS Top_user_pincodewise(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                pincode VARCHAR(100),
                                                                registered_users INT);
                    CREATE TABLE IF NOT EXISTS Top_transaction_districtwise(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                district VARCHAR(100),
                                                                transaction_count INT,
                                                                transaction_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS Top_transaction_pincodewise(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                pincode VARCHAR(100),
                                                                transaction_count INT,
                                                                transaction_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS Top_insurance_districtwise(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                district VARCHAR(100),
                                                                insurance_count INT,
                                                                insurance_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS Top_insurance_pincodewise(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                year INT,
                                                                quarter VARCHAR(2),
                                                                pincode VARCHAR(100),
                                                                insurance_count INT,
                                                                insurance_amount FLOAT);
                    CREATE TABLE IF NOT EXISTS State_level_location_metrics(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                district VARCHAR(100),
                                                                latitude FLOAT,
                                                                longitude FLOAT,
                                                                metric FLOAT);
                    CREATE TABLE IF NOT EXISTS India_level_location_metrics(id INT AUTO_INCREMENT PRIMARY KEY,
                                                                state VARCHAR(100),
                                                                latitude FLOAT,
                                                                longitude FLOAT,
                                                                metric FLOAT);
                    """
            for _ in cursor.execute(query, multi=True):
                pass
            conn.commit()
            print("* MYSQL phonepe_pulse database and table creation completed")
        except Error as e:
            tb = sys.exc_info()
            lineno = tb.tb_lineno
            print(f"* MYSQL DB & TABLE creation failed due to {e} at line number {lineno}")
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()

    def data_transfer(self):
        print("\nDATA INSERTION TO SQL TABLE")
        try:
            conn = msql.connect(host="localhost", user="root", password="root", database="project_phonepe_pulse")
            print("* MYSQL Database Connection established")

            cursor = conn.cursor()

            query = """TRUNCATE TABLE Aggregated_user;
                    TRUNCATE TABLE Aggregated_transaction;
                    TRUNCATE TABLE Aggregated_insurance;
                    TRUNCATE TABLE Map_user;
                    TRUNCATE TABLE Map_transaction;
                    TRUNCATE TABLE Map_insurance;
                    TRUNCATE TABLE Top_user_districtwise;
                    TRUNCATE TABLE Top_user_pincodewise;
                    TRUNCATE TABLE Top_transaction_districtwise;
                    TRUNCATE TABLE Top_transaction_pincodewise;
                    TRUNCATE TABLE Top_insurance_districtwise;
                    TRUNCATE TABLE Top_insurance_pincodewise;
                    TRUNCATE TABLE State_level_location_metrics;
                    TRUNCATE TABLE India_level_location_metrics
                    """
            
            for _ in cursor.execute(query, multi=True):
                pass
            conn.commit()
            print("* Truncate tables completed")

            # Map Dataframe to SQL Table Name
            map_df_dict = {"Aggregated_user" : aggr_user_df,
                            "Aggregated_transaction" : aggr_trans_df,
                            "Aggregated_insurance" : aggr_ins_df,
                            "Map_user" : map_user_df,
                            "Map_transaction" : map_trans_df,
                            "Map_insurance" : map_ins_df,
                            "Top_user_districtwise" : top_user_districtwise_df,
                            "Top_user_pincodewise" : top_user_pincodewise_df,
                            "Top_transaction_districtwise" : top_trans_districtwise_df,
                            "Top_transaction_pincodewise" : top_trans_pincodewise_df,
                            "Top_insurance_districtwise" : top_ins_districtwise_df,
                            "Top_insurance_pincodewise" : top_ins_pincodewise_df,
                            "State_level_location_metrics" : lat_long_state_df,
                            "India_level_location_metrics" : lat_long_india_df
                          }
            
            # Mapping Column Names to SQL Table Name
            column_name_dict = {}

            for table_name, data_f in map_df_dict.items():
                column_name_dict[table_name] = tuple([col.lower() for col in data_f.columns.tolist()])
        
            for table_name in map_df_dict.keys():
                curr_df = map_df_dict[table_name]
                columns = column_name_dict[table_name]
                val = ",".join(["%s"] * len(columns))
                query = f"INSERT INTO {table_name} ({', '.join(columns)}) values ({val})"
                data = [tuple(row) for row in curr_df.to_numpy()]
                print(f"\n  🚀 Inserting into table: {table_name}")
                print(f"  Query: {query}")
                print(f"  Sample row: {data[0]}")
                print(f"  Expected: {len(columns)} values, Got: {len(data[0])} values")
                if table_name == "State_level_location_metrics" or table_name == "India_level_location_metrics":
                    for i in range(0, len(data), 500):  # batches of 500
                        batch = data[i:i+500]
                        cursor.executemany(query, batch)
                        conn.commit()
                else:
                    cursor.executemany(query, data)
                    conn.commit()

            conn.commit()
            print("\n* Table data migration completed")
        except Error as e:
            import traceback, sys
            exc_type, exc_value, tb = sys.exc_info()
            lineno = tb.tb_lineno if tb else "unknown"
            print(f"* Table data insertion failed due to {e} at line number {lineno}")
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()

In [12]:
load_database().sql_table_creation()
load_database().data_transfer()

PHONEPE PULSE DB AND TABLE CREATION
* MYSQL Connection established
* Selected phonpe DB
* MYSQL phonepe_pulse database and table creation completed

DATA INSERTION TO SQL TABLE
* MYSQL Database Connection established
* Truncate tables completed

  🚀 Inserting into table: Aggregated_user
  Query: INSERT INTO Aggregated_user (state, year, quarter, brand, user_count, user_percentage) values (%s,%s,%s,%s,%s,%s)
  Sample row: ('Andaman & Nicobar', '2018', 'Q1', 'Xiaomi', 1665, 0.2470326409495549)
  Expected: 6 values, Got: 6 values

  🚀 Inserting into table: Aggregated_transaction
  Query: INSERT INTO Aggregated_transaction (state, year, quarter, transaction_type, transaction_count, transaction_amount) values (%s,%s,%s,%s,%s,%s)
  Sample row: ('Andaman & Nicobar', '2018', 'Q1', 'Recharge & bill payments', 4200, 1845307.4673655091)
  Expected: 6 values, Got: 6 values

  🚀 Inserting into table: Aggregated_insurance
  Query: INSERT INTO Aggregated_insurance (state, year, quarter, type, insuran

## DATAFRAME INFO

In [14]:
df_list = [key for key, val in globals().items() if isinstance(val, pd.DataFrame) and key.endswith('_df')]
print("Created DataFrame List:")
for dfs in df_list:
    print(f" {dfs}")

Created DataFrame List:
 aggr_user_df
 aggr_trans_df
 aggr_ins_df
 map_user_df
 map_trans_df
 map_ins_df
 top_user_districtwise_df
 top_user_pincodewise_df
 top_trans_districtwise_df
 top_trans_pincodewise_df
 top_ins_districtwise_df
 top_ins_pincodewise_df
 lat_long_state_df
 lat_long_india_df


In [15]:
print("DataFrame Info:")
for dfs in df_list:
    # get the actual DataFrame using its name
    df = globals()[dfs]
    print(f" {dfs} info : \n")
    df.info()
    print()

DataFrame Info:
 aggr_user_df info : 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6732 entries, 0 to 6731
Data columns (total 6 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   State            6732 non-null   object 
 1   Year             6732 non-null   object 
 2   Quarter          6732 non-null   object 
 3   Brand            6732 non-null   object 
 4   User_Count       6732 non-null   int64  
 5   User_Percentage  6732 non-null   float64
dtypes: float64(1), int64(1), object(4)
memory usage: 315.7+ KB

 aggr_trans_df info : 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5034 entries, 0 to 5033
Data columns (total 6 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   State               5034 non-null   object 
 1   Year                5034 non-null   object 
 2   Quarter             5034 non-null   object 
 3   Transaction_Type    5034 non-null   object

In [16]:
print("Duplicated values")
for dfs in df_list:
    df = globals()[dfs]
    print(f" Duplicates in {dfs} : {df.duplicated().sum()}")

Duplicated values
 Duplicates in aggr_user_df : 0
 Duplicates in aggr_trans_df : 0
 Duplicates in aggr_ins_df : 0
 Duplicates in map_user_df : 0
 Duplicates in map_trans_df : 0
 Duplicates in map_ins_df : 0
 Duplicates in top_user_districtwise_df : 0
 Duplicates in top_user_pincodewise_df : 0
 Duplicates in top_trans_districtwise_df : 0
 Duplicates in top_trans_pincodewise_df : 0
 Duplicates in top_ins_districtwise_df : 0
 Duplicates in top_ins_pincodewise_df : 0
 Duplicates in lat_long_state_df : 172281
 Duplicates in lat_long_india_df : 19682


In [17]:
print("NULL Value Count : ")
for dfs in df_list:
    df = globals()[dfs]
    print(f" NULL Values in {dfs} : \n{df.isnull().sum()}")

NULL Value Count : 
 NULL Values in aggr_user_df : 
State              0
Year               0
Quarter            0
Brand              0
User_Count         0
User_Percentage    0
dtype: int64
 NULL Values in aggr_trans_df : 
State                 0
Year                  0
Quarter               0
Transaction_Type      0
Transaction_Count     0
Transaction_Amount    0
dtype: int64
 NULL Values in aggr_ins_df : 
State               0
Year                0
Quarter             0
Type                0
Insurance_Count     0
Insurance_Amount    0
dtype: int64
 NULL Values in map_user_df : 
State               0
Year                0
Quarter             0
District            0
Registered_Users    0
AppOpen_Count       0
dtype: int64
 NULL Values in map_trans_df : 
State                 0
Year                  0
Quarter               0
District              0
Transaction_Count     0
Transaction_Amount    0
dtype: int64
 NULL Values in map_ins_df : 
State               0
Year                0
Quar

In [18]:
print("Columns of dataframe:")
for dfs in df_list:
    df = globals()[dfs]
    print(f" **{dfs} columns** : {df.columns.tolist()}")

Columns of dataframe:
 **aggr_user_df columns** : ['State', 'Year', 'Quarter', 'Brand', 'User_Count', 'User_Percentage']
 **aggr_trans_df columns** : ['State', 'Year', 'Quarter', 'Transaction_Type', 'Transaction_Count', 'Transaction_Amount']
 **aggr_ins_df columns** : ['State', 'Year', 'Quarter', 'Type', 'Insurance_Count', 'Insurance_Amount']
 **map_user_df columns** : ['State', 'Year', 'Quarter', 'District', 'Registered_Users', 'AppOpen_Count']
 **map_trans_df columns** : ['State', 'Year', 'Quarter', 'District', 'Transaction_Count', 'Transaction_Amount']
 **map_ins_df columns** : ['State', 'Year', 'Quarter', 'District', 'Insurance_Count', 'Insurance_Amount']
 **top_user_districtwise_df columns** : ['State', 'Year', 'Quarter', 'District', 'Registered_Users']
 **top_user_pincodewise_df columns** : ['State', 'Year', 'Quarter', 'Pincode', 'Registered_Users']
 **top_trans_districtwise_df columns** : ['State', 'Year', 'Quarter', 'District', 'Transaction_Count', 'Transaction_Amount']
 **top_