In [1]:
# @title Clone Pulse Data
!git clone https://github.com/PhonePe/pulse.git

Cloning into 'pulse'...
remote: Enumerating objects: 15229, done.[K
remote: Counting objects: 100% (27/27), done.[K
remote: Compressing objects: 100% (26/26), done.[K
remote: Total 15229 (delta 3), reused 1 (delta 1), pack-reused 15202[K
Receiving objects: 100% (15229/15229), 21.95 MiB | 16.61 MiB/s, done.
Resolving deltas: 100% (7745/7745), done.


In [2]:
# @title Imports
import os
import json
import pandas as pd
import numpy as np

In [3]:
# @title Set Target Path
target_path = 'pulse/data' # @param {type: "string"}

In [4]:
# @title Data Format Function
def format_data(_data: dict, _type: str, ) -> dict:

  # formating aggregated insurance and transation data
  if _type == 'aggregated/insurance' or _type == 'aggregated/transaction':
    # setting default dictiony format to return
    r_dict = {'trans_type': [], 'trans_count': [],
              'trans_amount': []}

    # formating data
    if _data['data']['transactionData']:
      for d in _data['data']['transactionData']:
        r_dict['trans_type'].append(d['name'])

        # raising error in case more than 1 payment instruments are found
        if len(d['paymentInstruments']) > 1:
          raise TypeError('Multiple payment instruments found')
        else:
          for i in d['paymentInstruments']:
            r_dict['trans_count'].append(i['count'])
            r_dict['trans_amount'].append(i['amount'])

  # formating aggregated user data
  elif _type == 'aggregated/user':
    # setting default dictiony format to return
    r_dict = {'user_brand': [], 'user_count': [],
              'user_percent': [],
              # 'agg_count': [],
              # 'agg_opens': []
              }

    # formating data
    if _data['data']['usersByDevice']:
      agg_data = _data['data']['aggregated']
      for d in _data['data']['usersByDevice']:
        r_dict['user_brand'].append(d['brand'])
        r_dict['user_count'].append(d['count'])
        r_dict['user_percent'].append(d['percentage'])
        # r_dict['agg_count'] = agg_data['registeredUsers']
        # r_dict['agg_opens'] = agg_data['appOpens']

  # formating map insurance and transation hover data
  elif _type == 'map/insurance/hover' or _type == 'map/transaction/hover':
    # setting default dictiony format to return
    r_dict = {'district': [],
              'trans_count': [], 'trans_amount': [],
              # 'trans_type': []
              }
    # typ = 'transaction' if 'transaction' in _type else 'insurance'

    # formating data
    if _data['data']['hoverDataList']:
      for d in _data['data']['hoverDataList']:
        r_dict['district'].append(d['name'])
        # r_dict['trans_type'].append(typ)

        # raising error in case more than 1 metric data is found
        if len(d['metric']) > 1:
          raise TypeError('Multiple data metrics found')
        else:
          for i in d['metric']:
            r_dict['trans_count'].append(i['count'])
            r_dict['trans_amount'].append(i['amount'])

  # formating map user hover data
  elif _type == 'map/user/hover':
    # setting default dictiony format to return
    r_dict = {'district': [], 'user_count': [],
              'user_opens': []}
    hover_data = _data['data']['hoverData']

    # formating data
    if hover_data:
      for d in hover_data.keys():
        r_dict['district'].append(d)
        r_dict['user_count'].append(hover_data[d]['registeredUsers'])
        r_dict['user_opens'].append(hover_data[d]['appOpens'])

  # formating top insurance and transation data
  elif _type == 'top/insurance' or _type == 'top/transaction':
    # setting default dictiony format to return
    r_dict = {'entity_type': [], 'entity_name': [],
              'rank': [], 'trans_count': [],
              'trans_amount': [],
              # 'trans_type': []
              }
    # typ = 'transaction' if 'transaction' in _type else 'insurance'

    # formating district data
    if _data['data']['districts']:
      rank = 1
      for d in _data['data']['districts']:
        r_dict['entity_type'].append('district')
        r_dict['rank'].append(rank)
        r_dict['entity_name'].append(d['entityName'])
        # r_dict['trans_type'].append(typ)
        r_dict['trans_count'].append(d['metric']['count'])
        r_dict['trans_amount'].append(d['metric']['amount'])
        rank += 1

    # formating pincode data
    if _data['data']['pincodes']:
      rank = 1
      for d in _data['data']['pincodes']:
        r_dict['entity_type'].append('pincode')
        r_dict['rank'].append(rank)
        r_dict['entity_name'].append(d['entityName'])
        # r_dict['trans_type'].append(typ)
        r_dict['trans_count'].append(d['metric']['count'])
        r_dict['trans_amount'].append(d['metric']['amount'])
        rank += 1

  # formating top user data
  elif _type == 'top/user':
    # setting default dictiony format to return
    r_dict = {'entity_type': [], 'rank': [],
              'entity_name': [], 'user_count': []}

    # formating district data
    if _data['data']['districts']:
      rank = 1
      for d in _data['data']['districts']:
        r_dict['entity_type'].append('district')
        r_dict['rank'].append(rank)
        r_dict['entity_name'].append(d['name'])
        r_dict['user_count'].append(d['registeredUsers'])
        rank += 1

    # formating pincode data
    if _data['data']['pincodes']:
      rank = 1
      for d in _data['data']['pincodes']:
        r_dict['entity_type'].append('pincode')
        r_dict['rank'].append(rank)
        r_dict['entity_name'].append(d['name'])
        r_dict['user_count'].append(d['registeredUsers'])
        rank += 1

  # raise Error in case of invalid data type
  else:
    raise ValueError('Invalid data type')

  return r_dict

In [5]:
# @title Formate Data as Data Frame
data_dict = {'aggregated/insurance': [],
             'aggregated/transaction': [],
             'aggregated/user': [],
             'map/insurance/hover': [],
             'map/transaction/hover': [],
             'map/user/hover': [],
             'top/insurance': [],
             'top/transaction': [],
             'top/user': []
             }

# iterates through the directories from the @target_path
for files in os.walk(target_path):
  # iterates through all state directories that contains files
  if 'state' in files[0] and files[2]:
    state, year = files[0].split('/')[-2:]
    state = state.replace('-islands', '').replace('-', ' ')

    year = int(year)

    # cashing data with respect to data type
    for x in data_dict.keys():
      if x in files[0]:
        _d = data_dict[x]
        print(f'processing: {x}...') if not _d else None

        # iterates through all files in the directory
        for f in files[2]:
          f_path = os.path.join(files[0], f)

          # reading file
          with open(f_path, 'r') as file:
            data = json.load(file)
            df = pd.DataFrame(format_data(data, x))

            # Adding Default info
            if not df.empty:
              df.loc[:, 'state'] = state.title()
              df.loc[:, 'year'] = year
              df.loc[:, 'quarter'] = int(f[0])
              _d.append(df)

processing: top/insurance...
processing: top/transaction...
processing: top/user...
processing: aggregated/insurance...
processing: aggregated/transaction...
processing: aggregated/user...
processing: map/insurance/hover...
processing: map/transaction/hover...
processing: map/user/hover...


In [6]:
# @title Pre Processing Data Frame
df_dict = {}
for d in data_dict.keys():
  df = pd.concat(data_dict[d])
  df.sort_values(by=['state', 'year', 'quarter'], inplace=True)
  df.reset_index(drop=True, inplace=True)
  d = d[:-6] if d.endswith('hover') else d
  df_dict[d.replace('/','_')] = df

In [7]:
# @title Creating Data Base with sqlite
import sqlite3

# Connecting to database
conn = sqlite3.connect('pulse_data.db')
cur = conn.cursor()

# Creating the dtype dict
typ_dict = {np.dtype(int): 'bigint', np.dtype(str): 'text',
            np.dtype(float): 'decimal', np.dtype(object): 'text'}

# Iterating through the data frames in df_dict
for table_name, df in df_dict.items():
    # Droping the table if exists
    cur.execute(f'''drop table if exists {table_name}''')

    # Createing table
    cur.execute(f'''create table {table_name}
                ({', '.join([f'{col} {typ_dict[df[col].dtype]}'
                             for col in df.columns])})''')

    # Inserting the datas into the table
    for row in df.itertuples(index=False):
        cur.execute(f'''insert into {table_name} ({', '.join(df.columns)})
                    values ({('?, ' * len(df.columns))[:-2]})''', row)

    print(table_name, 'Table Created!')

# Commit the changes to the database
conn.commit()

# Close the connection to the database
conn.close()

aggregated_insurance Table Created!
aggregated_transaction Table Created!
aggregated_user Table Created!
map_insurance Table Created!
map_transaction Table Created!
map_user Table Created!
top_insurance Table Created!
top_transaction Table Created!
top_user Table Created!


In [8]:
# @title Show Database Contents
conn = sqlite3.connect('pulse_data.db')
cur = conn.cursor()

# Get all table names
tables = cur.execute('''select name from sqlite_master
                      where type="table"''').fetchall()

# Print table names and their columns
for table_name in tables:
    print(f'Table: {table_name[0]}')
    columns = cur.execute(f'''pragma table_info
                          ('{table_name[0]}')''').fetchall()
    for column in columns:
        print(f'''\t{f'{column[1]}':<15}\t({column[2]})''')

conn.close()


Table: aggregated_insurance
	trans_type     	(TEXT)
	trans_count    	(bigint)
	trans_amount   	(decimal)
	state          	(TEXT)
	year           	(bigint)
	quarter        	(bigint)
Table: aggregated_transaction
	trans_type     	(TEXT)
	trans_count    	(bigint)
	trans_amount   	(decimal)
	state          	(TEXT)
	year           	(bigint)
	quarter        	(bigint)
Table: aggregated_user
	user_brand     	(TEXT)
	user_count     	(bigint)
	user_percent   	(decimal)
	state          	(TEXT)
	year           	(bigint)
	quarter        	(bigint)
Table: map_insurance
	district       	(TEXT)
	trans_count    	(bigint)
	trans_amount   	(decimal)
	state          	(TEXT)
	year           	(bigint)
	quarter        	(bigint)
Table: map_transaction
	district       	(TEXT)
	trans_count    	(bigint)
	trans_amount   	(decimal)
	state          	(TEXT)
	year           	(bigint)
	quarter        	(bigint)
Table: map_user
	district       	(TEXT)
	user_count     	(bigint)
	user_opens     	(bigint)
	state          	(