<h1><center>Data Extraction from PhonePe Pulse Repo<center></h1>

<b>Importing Libraries<b>


In [1]:
#Import necessary libraries
import pandas as pd
import mysql.connector as sql
import os
import git
from pathlib import  Path
import ipywidgets as widgets

<b>Clone Git Repository<b>

In [2]:
# #Specify the directories
# git_url = 'https://github.com/PhonePe/pulse.git'
# cloned_directory = 'data/pulse_data'

# #Clone
# git.Repo.clone_from(git_url, cloned_directory)

<h1><center>DATA PROCESSING</center></h1>

[Entry Point](#entry_point)

<b>Declaring all the directories and the data dictionaries</b>

1. Here we are declaring all the directories from which we have to extract the data. As the path is almost the same that is why we create a function which takes only the part that is different as parameter and then join with f string <br><br>
2. The data dictionaries where we will save the data to change to dataframe

In [3]:
#Directories
def get_directory(path):
    return f'data/pulse_data/data/{path}/country/india/state'

In [4]:
#Data Dictionaries
agg_insurance_data = {'States':[],'Years':[],'Quarters':[],'Policy_count':[],'Policy_amount':[]}
agg_transaction_data = {'States':[],'Years':[],'Quarters':[],'Transaction_type':[],'Transaction_count':[], 'Transaction_amount':[] }
agg_user_data = {'States':[],'Years':[],'Quarters':[],'Brands':[],'Registered_Users_per_brand':[],'Percentage':[]}

map_insurance_data = {'States':[],'Years':[],'Quarters':[],'District':[],'Policy_count':[],'Policy_amount':[]}
map_transaction_data = {'States':[],'Years':[],'Quarters':[],'District':[],'Transaction_count':[], 'Transaction_amount':[] }
map_user_data = {'States':[],'Years':[],'Quarters':[],'District':[],'Registered_Users':[],'App_opens':[]}

top_insurance_data = {'States':[],'Years':[],'Quarters':[],'Pincodes':[],'Policy_count':[],'Policy_amount':[]}
top_transaction_data = {'States':[],'Years':[],'Quarters':[],'Pincodes':[],'Transaction_count':[], 'Transaction_amount':[] }
top_user_data = {'States':[],'Years':[],'Quarters':[],'Pincodes':[],'Registered_Users':[]}

<b>Defining necessary functions</b>

1. read_json - This function takes a json file as parameter and returns the json data

2. convert_to_dataframe - Function that takes a dictionary and converts to a pandas dataframe

3. save_to_csv - This function takes a dictionary and name of the file as parameter and converts the dictionary to a pandas dataframe and dataframe to a csv file.
It also return the converted dataframe

In [5]:
#Read JSON
def read_json(path):
    return pd.read_json(path)

In [6]:
#Convert to dataframe
def convert_to_dataframe(file_dict):
    return pd.DataFrame(file_dict)

In [7]:
#Save the dataframe to csv file
def save_to_csv(file_dict,filename):
    #Convert the dictionary to dataframe
    df = convert_to_dataframe(file_dict)
    df.to_csv(f'data/csv_data/{filename}',index = False)
    return df

<b>Extraction Functions</b>

1. This block contains all the functions that extracts data from the JSON file and saves it to a dictionary.
Parameters are state, year, quarter and the json data

2. Function to enter state, year and quarter as these values are common troughout the data


In [8]:
#Function to enter state,year and quarter as they are common in all the data
def insert_state_year_quarter(state,year,quarter,dict_file):
    if '-' in state:
        state = state.replace('-',' ')
    dict_file['States'] .append(state.capitalize())

    dict_file['Years'].append(year)
    dict_file['Quarters'].append(quarter)

In [9]:
#Extract aggregated insurance
def agg_insurance(state,year,quarter,year_json):
    data = year_json['data']['transactionData']
    if(data):
        for i in data:
            count = i['paymentInstruments'][0]['count']
            amount = i['paymentInstruments'][0]['amount']
            
            insert_state_year_quarter(state,year,quarter,agg_insurance_data)
            
            agg_insurance_data['Policy_count'].append(count)
            agg_insurance_data['Policy_amount'].append(amount)

In [10]:
#Extract aggregated transaction
def agg_transaction(state,year,quarter,year_json):
    data = year_json['data']['transactionData']
    if(data):
        for i in data:
            type = i['name']
            count = i['paymentInstruments'][0]['count']
            amount = i['paymentInstruments'][0]['amount']
            
            insert_state_year_quarter(state,year,quarter,agg_transaction_data)

            agg_transaction_data['Transaction_type'].append(type)
            agg_transaction_data['Transaction_count'].append(count)
            agg_transaction_data['Transaction_amount'].append(amount)

In [11]:
#Extract aggregated user
def agg_user(state,year,quarter,year_json):
    data = year_json['data']['usersByDevice']
    if(data):
        for i in data:
            brand = i['brand']
            users = i['count']
            percentage = i['percentage']
            
            insert_state_year_quarter(state,year,quarter,agg_user_data)
            
            agg_user_data['Brands'].append(brand)
            agg_user_data['Percentage'].append(percentage)
            agg_user_data['Registered_Users_per_brand'].append(users)


In [12]:
#Extract map insurance
def map_insurance(state,year,quarter,year_json):
    data = year_json['data']['hoverDataList']
    if(data):
        for i in data:
            count = i['metric'][0]['count']
            amount = i['metric'][0]['amount']
            district = i['name']
            
            insert_state_year_quarter(state,year,quarter,map_insurance_data)
            
            map_insurance_data['District'].append(district)
            map_insurance_data['Policy_count'].append(count)
            map_insurance_data['Policy_amount'].append(amount)

In [13]:
#Extract map transaction
def map_transaction(state,year,quarter,year_json):
    data = year_json['data']['hoverDataList']
    if(data):
        for i in data:
            district = i['name']
            count = i['metric'][0]['count']
            amount = i['metric'][0]['amount']
            
            insert_state_year_quarter(state,year,quarter,map_transaction_data)

            map_transaction_data['District'].append(district)
            map_transaction_data['Transaction_count'].append(count)
            map_transaction_data['Transaction_amount'].append(amount)

In [14]:
#Extract map user
def map_user(state,year,quarter,year_json):
    data = year_json['data']['hoverData']
    if(data):
        for i in data.items():
            district = i[0]
            users = i[1]['registeredUsers']
            app_opens = i[1]['appOpens']
            
            insert_state_year_quarter(state,year,quarter,map_user_data)
            
            map_user_data['District'].append(district)
            map_user_data['App_opens'].append(app_opens)
            map_user_data['Registered_Users'].append(users)

In [15]:
#Extract top insurance
def top_insurance(state,year,quarter,year_json):
    data = year_json['data']['pincodes']
    if(data):
        for i in data:
            pincode = i['entityName']
            count = i['metric']['count']
            amount = i['metric']['amount']
            
            insert_state_year_quarter(state,year,quarter,top_insurance_data)
            
            top_insurance_data['Pincodes'].append(pincode)
            top_insurance_data['Policy_count'].append(count)
            top_insurance_data['Policy_amount'].append(amount)

In [33]:
#Extract top transaction
def top_transaction(state,year,quarter,year_json):
    data = year_json['data']['pincodes']
    if(data):
        for i in data:
            pincode = i['entityName']
            count = i['metric']['count']
            amount = i['metric']['amount']
            
            insert_state_year_quarter(state,year,quarter,top_transaction_data)

            top_transaction_data['Pincodes'].append(pincode)
            top_transaction_data['Transaction_count'].append(count)
            top_transaction_data['Transaction_amount'].append(amount)

In [17]:
#Extract top user
def top_user(state,year,quarter,year_json):
    data = year_json['data']['pincodes']
    if(data):
        for i in data:
            pincode = i['name']
            users = i['registeredUsers']
            
            insert_state_year_quarter(state,year,quarter,top_user_data)
            
            top_user_data['Pincodes'].append(pincode)
            top_user_data['Registered_Users'].append(users)

<b>Accumulated extraction functions</b>

This function makes a dictionary of the extraction functions so that it becomes easier to call each extraction function

In [18]:
#Function to accumulate all the functions and call respective function

#Check Invalid
def invalid_extraction_op():
    raise Exception("Invalid operation") 

#Call the respective functions
def perform_extraction(state,year,quarter,operation,year_json):
    etl_functions = {
    "agg_insurance": agg_insurance,
    "agg_transaction": agg_transaction,
    "agg_user":agg_user,
    "map_insurance":map_insurance,
    "map_transaction":map_transaction,
    "map_user":map_user,
    "top_insurance":top_insurance,
    "top_transaction":top_transaction,
    "top_user":top_user,
    }

    chosen_extraction_function = etl_functions.get(operation, invalid_extraction_op)

    return chosen_extraction_function(state,year,quarter,year_json)

<b>Function to iterate through the directories</b>

This function takes a directory path and the extraction function name as parameter.
It goes through the directories and sends the JSON file for extraction

In [19]:
#Function to iterate and get the JSON file
def iterate_through_files(directory,operation):
        states_list = os.listdir(directory)

        #Iterate through states list
        for state in states_list:
                #Get the state path
                state_path = f'{directory}//{state}'
                #List all the years
                state_year = os.listdir(state_path)
                #Iterate through year list
                for year in state_year:
                        #Get a year path
                        year_path = f'{state_path}//{year}'
                        #List all the files in the year folder
                        filename_list = []
                        for (dirpath, dirnames, files) in os.walk(year_path):
                                filename_list.extend(files)
                                break
                        #Iterate for each json file
                        for quarter_file in filename_list:
                                #Get the file path
                                quarter_path = f'{year_path}//{quarter_file}'
                                #Extract the quarter from the file name
                                quarter = int(Path(quarter_path).stem)
                                #Read the json file
                                year_json = read_json(quarter_path)
                                #Function call for each file to enter it in the data dictionary
                                perform_extraction(state,year,quarter,operation,year_json)


<a id = 'entry_point'></a>

<h3><center>*********<u>Entry Point of the program</u>*********</center></h3>


<center>Run all the cell blocks above and the run this to start the process</center>

<b>Calling the functions to iterate through the directories</b>

-> Here we call the <i><u>iterate_through_files()</u></i> for all the directories.<br>
-> Then the extracted data is saved into the data dictionaries.<br>
-> After that the dictionary is saved to the csv file, and the dataframe returned is saved in a variable.<br>
-> Then all the dataframes is saved to a dictionary where the key is corresponding sql table name to make the insertion easier<br>
-> This is the main entry point

In [None]:
#Function calls for iterating through the files

dataframes = {}

#Aggregated insurance
iterate_through_files(get_directory('aggregated/insurance'),'agg_insurance')
agg_ins = save_to_csv(agg_insurance_data,'Aggregated_insurance_table.csv')
dataframes['agg_insurance'] = agg_ins

#Aggregated transaction
iterate_through_files(get_directory('aggregated/transaction'),'agg_transaction')
agg_tran = save_to_csv(agg_transaction_data,'Aggregated_transaction_table.csv')
dataframes['agg_transaction'] = agg_tran

#Aggregated user
iterate_through_files(get_directory('aggregated/user'),'agg_user')
agg_us = save_to_csv(agg_user_data,'Aggregated_user_table.csv')
dataframes['agg_user'] = agg_us

#Map insurance
iterate_through_files(get_directory('map/insurance/hover'),'map_insurance')
map_ins = save_to_csv(map_insurance_data,'Map_insurance_table.csv')
dataframes['map_insurance'] = map_ins

#Map transaction
iterate_through_files(get_directory('map/transaction/hover'),'map_transaction')
map_tran = save_to_csv(map_transaction_data,'Map_transaction_table.csv')
dataframes['map_transaction'] = map_tran

#Map user
iterate_through_files(get_directory('map/user/hover'),'map_user')
map_us = save_to_csv(map_user_data,'Map_user_table.csv')
dataframes['map_user'] = map_us

#Top insurance
iterate_through_files(get_directory('top/insurance'),'top_insurance')
top_ins = save_to_csv(top_insurance_data,'Top_insurance_table.csv')
dataframes['top_insurance'] = top_ins

#Top transaction
iterate_through_files(get_directory('top/transaction'),'top_transaction')
top_tran = save_to_csv(top_transaction_data,'Top_transaction_table.csv')
dataframes['top_transaction'] = top_tran

#Top user
iterate_through_files(get_directory('top/user'),'top_user')
top_us =save_to_csv(top_user_data,'Top_user_table.csv')
dataframes['top_user'] = top_us

dataframes

<h1><center>SQL PROCESSING</center></h1>

<b>Connect to Database</b>

In [21]:
mydb = sql.connect(host="localhost",
                user="root",
                password="root",
                )
mycursor = mydb.cursor(buffered=True)

<b>Create database</b>

In [22]:
# query = 'Create database if not exists phonepe_pulse'
# mycursor.execute(query)

<h3><center>CREATE TABLES</center></h3>

In [23]:
#Defining the database to be used
mycursor.execute('Use phonepe_pulse')

In [24]:
tablenames = {
    'agg_insurance':'Policy_count int,Policy_amount double',
    'agg_transaction':'Transaction_type varchar(100),Transaction_count int,Transaction_amount double',
    'agg_user':'Brand varchar(100),Registered_Users_per_brand int,Percentage double',
    'map_insurance':'District varchar(100),Policy_count int,Policy_amount double',
    'map_transaction':'District varchar(100),Transaction_count int,Transaction_amount double',
    'map_user_data':'District varchar(100),Registered_User int,App_opens int',
    'top_insurance':'Pincode int,Policy_count int,Policy_amount double',
    'top_transaction':'Pincode int,Transaction_count int,Transaction_amount double',
    'top_user':'Pincode int,Registered_User int'
}

In [25]:
#Function to crate all tables together in database
def create_table(table_name):
    for key,value in table_name.items():
        query = f'create table if not exists {key} (State varchar(100), Year int, Quarter int,{value})'
        mycursor.execute(query)

In [26]:
create_table(tablenames)

In [27]:
# mycursor.execute("create table agg_trans (State varchar(100), Year int, Quarter int, Transaction_type varchar(100), Transaction_count int, Transaction_amount double)")

# for i,row in df_agg_trans.iterrows():
#     #here %S means string values 
#     sql = "INSERT INTO agg_trans VALUES (%s,%s,%s,%s,%s,%s)"
#     mycursor.execute(sql, tuple(row))
#     # the connection is not auto committed by default, so we must commit to save our changes
#     mydb.commit()

<h3><center>INSERT QUERIES</center></h3>

In [None]:
for key,value in dataframes.items():
    for j,row in value.iterrows():
        query = f'INSERT INTO {key} VALUES ({"%s" * value.shape[1]})'
        mycursor.execute(query, tuple(row))
        break
        mydb.commit()
