<h1><center>Data Extraction from PhonePe Pulse Repo<center></h1>

<b> Function Chronology </b><br>

<i>STEP 1</i>
<ul>
    <li>process()<i><b>[Entry point of the program]</b></i></li>
    <ul>
        <li>iterate_through_files()</li>
        <ul>
            <li>get_directory()</li>
            <ul>
                <li>read_json()</li>
                <li>perform_extraction() <i><b>[As per parameter it goes to either of the below functions]</b></i></li>
                <ul>
                    <li>agg_insurance()</li>
                    <li>agg_transaction()</li>
                    <li>agg_user()</li>
                    <li>map_insurance()</li>
                    <li>map_transaction()</li>
                    <li>map_user()</li>
                    <li>top_insurance()</li>
                    <li>top_transaction()</li>
                    <li>top_user()</li>
                    <ul>
                        <li>insert_state_year_quarter()<i><b>[Each of the above extraction functions goes through this function to insert the state year and quarter]</b></i></li>
                    </ul>
                </ul>
            </ul>
        </ul>
    </ul>
    <li>save_to_csv()</li>
</ul>

<i>STEP 2</i>
<ul>
    <li>create table()</li>
</ul>

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

<b>Importing Libraries<b>

In [20]:
#Import necessary libraries
import pandas as pd
import mysql.connector as sql
import os
import git
from pathlib import  Path

<b>Clone Git Repository<b>

In [21]:
# #Specify the directories
# git_url = 'https://github.com/PhonePe/pulse.git'
# cloned_directory = 'data/pulse_data'

# #Clone
# git.Repo.clone_from(git_url, cloned_directory)

<h1><center>DATA PROCESSING</center></h1>

<b>Declaring all the directories and the data dictionaries</b>

1. Here we are declaring all the directories from which we have to extract the data. As the path is almost the same that is why we create a function which takes only the part that is different as parameter and then join with f string <br><br>
2. The data dictionaries where we will save the data to change to dataframe

In [22]:
#Directories
def get_directory(path):
    return f'data/pulse_data/data/{path}/country/india/state'

In [23]:
#Data Dictionaries
agg_insurance_data = {'States':[],'Years':[],'Quarters':[],'Policy_count':[],'Policy_amount':[]}
agg_transaction_data = {'States':[],'Years':[],'Quarters':[],'Transaction_type':[],'Transaction_count':[], 'Transaction_amount':[] }
agg_user_data = {'States':[],'Years':[],'Quarters':[],'Brands':[],'Registered_Users_per_brand':[],'Percentage':[]}

map_insurance_data = {'States':[],'Years':[],'Quarters':[],'District':[],'Policy_count':[],'Policy_amount':[]}
map_transaction_data = {'States':[],'Years':[],'Quarters':[],'District':[],'Transaction_count':[], 'Transaction_amount':[] }
map_user_data = {'States':[],'Years':[],'Quarters':[],'District':[],'Registered_Users':[],'App_opens':[]}

top_insurance_data = {'States':[],'Years':[],'Quarters':[],'Pincodes':[],'Policy_count':[],'Policy_amount':[]}
top_transaction_data = {'States':[],'Years':[],'Quarters':[],'Pincodes':[],'Transaction_count':[], 'Transaction_amount':[] }
top_user_data = {'States':[],'Years':[],'Quarters':[],'Pincodes':[],'Registered_Users':[]}

<b>Defining necessary functions</b>

1. read_json - This function takes a json file as parameter and returns the json data

2. save_to_csv - This function takes a dictionary and name of the file as parameter and converts the dictionary to a pandas dataframe and dataframe to a csv file.
It also return the converted dataframe

In [24]:
#Read JSON
def read_json(path):
    return pd.read_json(path)

In [25]:
#Save the dataframe to csv file
def save_to_csv(file_dict,filename):
    #Convert the dictionary to dataframe
    df = pd.DataFrame(file_dict)
    df.to_csv(f'data/csv_data/{filename}',index = False)
    return df

<b>Extraction Functions</b>

1. This block contains all the functions that extracts data from the JSON file and saves it to a dictionary.
Parameters are state, year, quarter and the json data

2. Function to enter state, year and quarter as these values are common troughout the data

In [26]:
#Function to enter state,year and quarter as they are common in all the data
def insert_state_year_quarter(state,year,quarter,dict_file):
    if '-' in state:
        state = state.replace('-',' ')
    dict_file['States'] .append(state.capitalize())

    dict_file['Years'].append(int(year))
    dict_file['Quarters'].append(quarter)

In [27]:
#Extract aggregated insurance
def agg_insurance(state,year,quarter,year_json):
    data = year_json['data']['transactionData']
    if(data):
        for i in data:
            count = i['paymentInstruments'][0]['count']
            amount = i['paymentInstruments'][0]['amount']
            
            insert_state_year_quarter(state,year,quarter,agg_insurance_data)
            
            agg_insurance_data['Policy_count'].append(count)
            agg_insurance_data['Policy_amount'].append(amount)

In [28]:
#Extract aggregated transaction
def agg_transaction(state,year,quarter,year_json):
    data = year_json['data']['transactionData']
    if(data):
        for i in data:
            type = i['name']
            count = i['paymentInstruments'][0]['count']
            amount = i['paymentInstruments'][0]['amount']
            
            insert_state_year_quarter(state,year,quarter,agg_transaction_data)

            agg_transaction_data['Transaction_type'].append(type)
            agg_transaction_data['Transaction_count'].append(count)
            agg_transaction_data['Transaction_amount'].append(amount)

In [29]:
#Extract aggregated user
def agg_user(state,year,quarter,year_json):
    data = year_json['data']['usersByDevice']
    if(data):
        for i in data:
            brand = i['brand']
            users = i['count']
            percentage = i['percentage']
            
            insert_state_year_quarter(state,year,quarter,agg_user_data)
            
            agg_user_data['Brands'].append(brand)
            agg_user_data['Percentage'].append(percentage)
            agg_user_data['Registered_Users_per_brand'].append(users)


In [30]:
#Extract map insurance
def map_insurance(state,year,quarter,year_json):
    data = year_json['data']['hoverDataList']
    if(data):
        for i in data:
            count = i['metric'][0]['count']
            amount = i['metric'][0]['amount']
            district = i['name']
            
            insert_state_year_quarter(state,year,quarter,map_insurance_data)
            
            map_insurance_data['District'].append(district)
            map_insurance_data['Policy_count'].append(count)
            map_insurance_data['Policy_amount'].append(amount)

In [31]:
#Extract map transaction
def map_transaction(state,year,quarter,year_json):
    data = year_json['data']['hoverDataList']
    if(data):
        for i in data:
            district = i['name']
            count = i['metric'][0]['count']
            amount = i['metric'][0]['amount']
            
            insert_state_year_quarter(state,year,quarter,map_transaction_data)

            map_transaction_data['District'].append(district)
            map_transaction_data['Transaction_count'].append(count)
            map_transaction_data['Transaction_amount'].append(amount)

In [32]:
#Extract map user
def map_user(state,year,quarter,year_json):
    data = year_json['data']['hoverData']
    if(data):
        for i in data.items():
            district = i[0]
            users = i[1]['registeredUsers']
            app_opens = i[1]['appOpens']
            
            insert_state_year_quarter(state,year,quarter,map_user_data)
            
            map_user_data['District'].append(district)
            map_user_data['App_opens'].append(app_opens)
            map_user_data['Registered_Users'].append(users)

In [33]:
#Extract top insurance
def top_insurance(state,year,quarter,year_json):
    data = year_json['data']['pincodes']
    if(data):
        for i in data:
            pincode = i['entityName']
            count = i['metric']['count']
            amount = i['metric']['amount']
            
            insert_state_year_quarter(state,year,quarter,top_insurance_data)
            
            top_insurance_data['Pincodes'].append(pincode)
            top_insurance_data['Policy_count'].append(count)
            top_insurance_data['Policy_amount'].append(amount)

In [34]:
#Extract top transaction
def top_transaction(state,year,quarter,year_json):
    data = year_json['data']['pincodes']
    if(data):
        for i in data:
            pincode = i['entityName']
            count = i['metric']['count']
            amount = i['metric']['amount']
            
            insert_state_year_quarter(state,year,quarter,top_transaction_data)

            top_transaction_data['Pincodes'].append(pincode)
            top_transaction_data['Transaction_count'].append(count)
            top_transaction_data['Transaction_amount'].append(amount)

In [35]:
#Extract top user
def top_user(state,year,quarter,year_json):
    data = year_json['data']['pincodes']
    if(data):
        for i in data:
            pincode = i['name']
            users = i['registeredUsers']
            
            insert_state_year_quarter(state,year,quarter,top_user_data)
            
            top_user_data['Pincodes'].append(pincode)
            top_user_data['Registered_Users'].append(users)

<b>Accumulated extraction functions</b>

This function makes a dictionary of the extraction functions so that it becomes easier to call each extraction function

In [36]:
#Function to accumulate all the functions and call respective function

#Check Invalid
def invalid_extraction_op():
    raise Exception("Invalid operation") 

#Call the respective functions
def perform_extraction(state,year,quarter,operation,year_json):
    etl_functions = {
    "agg_insurance": agg_insurance,
    "agg_transaction": agg_transaction,
    "agg_user":agg_user,
    "map_insurance":map_insurance,
    "map_transaction":map_transaction,
    "map_user":map_user,
    "top_insurance":top_insurance,
    "top_transaction":top_transaction,
    "top_user":top_user,
    }

    chosen_extraction_function = etl_functions.get(operation, invalid_extraction_op)

    return chosen_extraction_function(state,year,quarter,year_json)

<b>Function to iterate through the directories</b>

This function takes a directory path and the extraction function name as parameter.
It goes through the directories and sends the JSON file for extraction

In [37]:
#Function to iterate and get the JSON file
def iterate_through_files(directory,operation):
        states_list = os.listdir(directory)

        #Iterate through states list
        for state in states_list:
                #Get the state path
                state_path = f'{directory}//{state}'
                #List all the years
                state_year = os.listdir(state_path)
                #Iterate through year list
                for year in state_year:
                        #Get a year path
                        year_path = f'{state_path}//{year}'
                        #List all the files in the year folder
                        filename_list = []
                        for (dirpath, dirnames, files) in os.walk(year_path):
                                filename_list.extend(files)
                                break
                        #Iterate for each json file
                        for quarter_file in filename_list:
                                #Get the file path
                                quarter_path = f'{year_path}//{quarter_file}'
                                #Extract the quarter from the file name
                                quarter = int(Path(quarter_path).stem)
                                #Read the json file
                                year_json = read_json(quarter_path)
                                #Function call for each file to enter it in the data dictionary
                                perform_extraction(state,year,quarter,operation,year_json)


<a id = 'entry_point'></a>

<h3><center>*********<u>Entry Point of the program</u>*********</center></h3>


<center>Run all the cell blocks above and the run this to start the process</center>

<b>Calling the functions to iterate through the directories</b>

1. Here we call the process function which:<br>
        -> Calls the <i><u>iterate_through_files()</u></i> for all the directories.<br>
        -> Then the extracted data is saved into the data dictionaries.<br>
        -> After that the dictionary is saved to the csv file, and the dataframe returned is saved in a variable.<br>
        -> Then all the dataframes is saved to a dictionary where the key is corresponding sql table name to make the insertion easier<br>
This is the main entry point

In [38]:
#Function call for iterating through the files

dataframes = {} #Dictionary to save the dataframes

def process(directory,file_dict,operation,filename):
    iterate_through_files(get_directory(directory),operation)#iterate through the files
    df =save_to_csv(file_dict,filename)                                   #save to csv file and get the dataframe
    dataframes[operation] = df                                                #save dataframe to dictionary

#Calling the process() function to extract
process('aggregated/insurance',agg_insurance_data,'agg_insurance','Agg_insurance_table.csv')# #Aggregated insurance
process('aggregated/transaction',agg_transaction_data,'agg_transaction','Agg_transaction_table.csv')# #Aggregated transaction
process('aggregated/user',agg_user_data,'agg_user','Agg_user_table.csv')# #Aggregated user
process('map/insurance/hover',map_insurance_data,'map_insurance','Map_insurance_table.csv')# #Map insurance
process('map/transaction/hover',map_transaction_data,'map_transaction','Map_transaction_table.csv')# #Map transaction 
process('map/user/hover',map_user_data,'map_user','Map_user_table.csv')# #Map user
process('top/insurance',top_insurance_data,'top_insurance','Top_insurance_table.csv')# #Top insurance
process('top/transaction',top_transaction_data,'top_transaction','Top_transaction_table.csv')# #Top transaction
process('top/user',top_user_data,'top_user','Top_user_table.csv')# #Top user


<h1><center>SQL PROCESSING</center></h1>

<b>Connect to Database</b>

In [39]:
#Connecting to the database
mydb = sql.connect(host="localhost",
                user="root",
                password="root",
                )
mycursor = mydb.cursor(buffered=True)

<b>Create database</b>

In [40]:
#Creating Database
query = 'Create database if not exists phonepe_pulse'
mycursor.execute(query)

<h3><center>CREATE TABLES</center></h3>

1. Defining the database to be used<br>

2. Making a dictionary with all the tablenames as key and the fields which are not common for the create table function.<br>

3. Function definition to create the tables all together and function call to create the table.

In [41]:
#Defining the database to be used
mycursor.execute('Use phonepe_pulse')

In [42]:
#Dictionary with a key of tablename and there not common fields
tablenames = {
    'agg_insurance':'Policy_count float,Policy_amount double',
    'agg_transaction':'Transaction_type varchar(100),Transaction_count float,Transaction_amount double',
    'agg_user':'Brand varchar(100),Registered_Users_per_brand float,Percentage double',
    'map_insurance':'District varchar(100),Policy_count float,Policy_amount double',
    'map_transaction':'District varchar(100),Transaction_count float,Transaction_amount double',
    'map_user':'District varchar(100),Registered_User float,App_opens float',
    'top_insurance':'Pincode int,Policy_count float,Policy_amount double',
    'top_transaction':'Pincode int,Transaction_count float,Transaction_amount double',
    'top_user':'Pincode int,Registered_User float'
}

In [43]:
#Function to crate all tables together in database
def create_table(table_name):
    for key,value in table_name.items():
        query = f'create table if not exists {key} (State varchar(100), Year int, Quarter int,{value})'
        mycursor.execute(query)

In [44]:
#Function call to create all the tables
create_table(tablenames)

<h3><center>INSERT QUERIES</center></h3>

1. First block uses the dataframes{} dictionary to insert data in all the tables together.<br>

2. Second block shows how to insert in a single table at a time for debugging purposes

In [45]:
#Insert query to insert data into all the table in one loop(RECOMMENDED)
for key,value in dataframes.items():
    for index,row in value.iterrows():
        value_part = '(' + ', '.join(['%s'] * value.shape[1]) + ')'
        query = f'insert into {key} values {value_part}'
        mycursor.execute(query, tuple(row))
        mydb.commit()


In [46]:
# #Insert query to insert data into the individual table(OPTIONAL)
# for j,row in dataframes['map_user'].iterrows():
#     value_part = '(' + ', '.join(['%s'] * len(row)) + ')'
#     query = f'insert into map_user values {value_part}'
#     mycursor.execute(query, tuple(row))
#     mydb.commit()

In [6]:
import pandas as pd

df = pd.read_json('assets/india_states.geojson')
df['features'][0]['properties']

{'ST_NM': 'Arunachal Pradesh'}