In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
'''
  Clones github repository to GDrive - Run it just once
'''
%%bash
cd /content/drive/MyDrive/Learn/guvi/labs/Assignments/PhonePePulse
mkdir -p phonepe
cd phonepe
git clone https://github.com/PhonePe/pulse.git


In [2]:
'''
  Data loading functions that are used to load data
  from the JSON files on disk and store them into
  corresponding data frames. The data frame is closely
  modeled after the database schema
'''
import pandas as pd
import json
import os

def get_abs_fpaths(dir):
    for dirpath,_,filenames in os.walk(dir):
        for f in filenames:
            yield os.path.abspath(os.path.join(dirpath, f))

# Load a JSON data file into a dataframe,given the full path
def load_file (f):
  try:
    df = pd.read_json(f)
    df.drop(columns=['success','code'])
    return df
  except:
    raise Exception(f'''File not found: {f}''')

# Decode common record attributes from a JSON file
def load_common(f, record_class, retd):
  #print('''decode_common() - Start ''')
  state=None
  year=None
  quarter=None
  if 'state' in f:
    state,year,quarter = os.path.normpath(f).split(os.sep)[-3:]
  else:
    year,quarter=os.path.normpath(f).split(os.sep)[-2:]
  quarter=quarter.split('.')[0]
  geo_type='Country'
  geo_name='india'
  if year is not None and quarter is not None:
    if state is not None:
      geo_type='State'
      geo_name=state

  #print('''decode_common() - End ''')
  retd['year'].append(year)
  retd['quarter'].append(quarter)
  retd['geo_type'].append(geo_type)
  retd['geo_name'].append(geo_name)
  return

def load_top_txns(f,record_class,retd):
  #print('load_top_txns() - Start')
  # Load JSON contents into a temp dataframe
  df=load_file(f)

  # Seek to the contents of the 'transaction' data
  txn_states = df.loc['states','data']
  txn_districts = df.loc['districts','data']
  txn_pincodes = df.loc['pincodes','data']

  if txn_states is not None:
    for txn in txn_states:
      load_common(f, record_class, retd)
      retd['category'].append(None)
      retd['geo_type'][-1]='State'
      retd['geo_name'][-1]=txn['entityName']
      retd['stat_type'].append(txn['metric']["type"])
      retd['count'].append(txn['metric']["count"])
      retd['amount'].append(txn['metric']["amount"])

  if txn_districts is not None:
    for txn in txn_districts:
      load_common(f, record_class, retd)
      retd['category'].append(None)
      retd['geo_type'][-1]='District'
      retd['geo_name'][-1]=txn['entityName']
      retd['stat_type'].append(txn['metric']["type"])
      retd['count'].append(txn['metric']["count"])
      retd['amount'].append(txn['metric']["amount"])

  if txn_pincodes is not None:
    for txn in txn_pincodes:
      load_common(f, record_class, retd)
      retd['category'].append(None)
      retd['geo_type'][-1]='Pincode'
      retd['geo_name'][-1]=txn['entityName']
      retd['stat_type'].append(txn['metric']["type"])
      retd['count'].append(txn['metric']["count"])
      retd['amount'].append(txn['metric']["amount"])

  return

def load_hover_txns(f,record_class,retd):
  return None

# Decode 'transaction' record
def load_agg_txns(f,report_type,retd):
  #print('decode_txn() - Start')

  # Load JSON contents into a temp dataframe
  df=load_file(f)

  # Seek to the contents of the 'transaction' data
  txn_recs = df.loc['transactionData','data']
  for txn_rec in txn_recs:
    # Collect the common fields from path name
    load_common(f, report_type, retd)

    # Process record for all category
    for payment_rec in txn_rec['paymentInstruments']:
      retd['category'].append(txn_rec["name"])
      retd['stat_type'].append(payment_rec["type"])
      retd['count'].append(payment_rec["count"])
      retd['amount'].append(payment_rec["amount"])

  #print('decode_txn() - End')
  return

def load_top_users(self,f,retd):
  # Load JSON contents into a temp dataframe
  df=Base.load_file(f)

  # Seek to the contents of the 'transaction' data
  user_states = df.loc['states','data']
  user_districts = df.loc['districts','data']
  user_pincodes = df.loc['pincodes','data']


  if user_states is not None:
    for user in user_states:
      load_common(f,retd)
      retd['geo_type'][-1]='State'
      retd['geo_name'][-1]=user['name']
      retd['reg_users'].append(user['registeredUsers'])

    if user_districts is not None:
      for user in user_districts:
        load_common(f,retd)
        retd['geo_type'][-1]='District'
        retd['geo_name'][-1]=user['name']
        retd['reg_users'].append(user['registeredUsers'])

    if user_pincodes is not None:
      for user in user_pincodes:
        load_common(f,retd)
        retd['geo_type'][-1]='Pincode'
        retd['geo_name'][-1]=user['name']
        retd['reg_users'].append(user['registeredUsers'])

def load_hover_users(f,record_class,retd):
  return None

# Decode 'user' record
def load_agg_users(f,record_class,retd):
  #print('decode_user - Start')

  # Load JSON contents into a temp dataframe
  df=load_file(f)

  # Seek to the contents of the 'user.aggregated' data
  user_stat_rec = df.loc['aggregated','data']

  #3print(f)
  #print (df.to_markdown())
  # Seek to the contents of the 'user.device' data
  device_recs = df.loc['usersByDevice','data']

  # For each device row populate both aggregate and device fields
  if device_recs is not None:
    for device_rec in device_recs:
      # Collect the common fields from path name
      load_common(f, record_class,retd)
      retd['category'].append(None)
      retd['reg_users'].append(user_stat_rec['registeredUsers'])
      retd['app_opens'].append(user_stat_rec['appOpens'])
      retd['brand'].append(device_rec['brand'])
      retd['count'].append(device_rec['count'])
      retd['percentage'].append(device_rec['percentage'])
  else:
    load_common(f, record_class,retd)
    retd['category'].append(None)
    retd['reg_users'].append(user_stat_rec['registeredUsers'])
    retd['app_opens'].append(user_stat_rec['appOpens'])
    retd['brand'].append('Unknown')
    retd['count'].append(0)
    retd['percentage'].append(100)

  #print('decode_user - End')
  return

  def load_top_ins(self,f,retd):
    # Load JSON
    df=load_file(f)

    # Seek to the contents of the 'transaction' data
    ins_states = df.loc['states','data']
    ins_districts = df.loc['districts','data']
    ins_pincodes = df.loc['pincodes','data']

    if ins_states is not None:
      for ins in ins_states:
        load_common(f,retd)
        retd['geo_type'][-1]='State'
        retd['geo_name'][-1]=ins['entityName']
        retd['stat_type'].append(ins['metric']["type"])
        retd['count'].append(ins['metric']["count"])
        retd['amount'].append(ins['metric']["amount"])

    if ins_districts is not None:
      for ins in ins_districts:
        load_common(f,retd)
        retd['geo_type'][-1]='District'
        retd['geo_name'][-1]=ins['entityName']
        retd['stat_type'].append(ins['metric']["type"])
        retd['count'].append(ins['metric']["count"])
        retd['amount'].append(ins['metric']["amount"])

    if ins_pincodes is not None:
      for ins in ins_pincodes:
        load_common(f,retd)
        retd['geo_type'][-1]='Pincode'
        retd['geo_name'][-1]=ins['entityName']
        retd['stat_type'].append(ins['metric']["type"])
        retd['count'].append(ins['metric']["count"])
        retd['amount'].append(ins['metric']["amount"])

def load_hover_ins(f,record_class,retd):
  return None

# Decode 'insurance' record
def load_agg_ins(f,record_class,retd):
  #print('decode_ins - Start')

  # Load JSON contents into a temp dataframe
  df=load_file(f)

  # print(df.to_markdown())
  # Seek to the contents of the 'insurance' data
  ins_recs = df.loc['transactionData','data']

  # For each 'insurance' row populate fields
  for ins_rec in ins_recs:
    # Collect the common fields from path name
    load_common(f, record_class, retd)
    for payment_rec in ins_rec['paymentInstruments']:
      retd['category'].append(ins_rec["name"])
      retd['stat_type'].append(payment_rec["type"])
      retd['count'].append(payment_rec["count"])
      retd['amount'].append(payment_rec["amount"])
  #print('decode_ins - End')
  return

# Suppoted record types
record_types = {
  "transaction":{
    "columns":['year','quarter', 'geo_type', 'geo_name', 'category', 'stat_type', 'count', 'amount'],
    "loaders":{
      "aggregated":load_agg_txns,
      #"top":load_top_txns,
      #"hover":load_hover_txns,
    }
  },

  #"user":{
    #"columns":['year','quarter', 'geo_type', 'geo_name', 'category', 'reg_users', 'app_opens','brand','count','percentage'],
    #"loaders":{
       #"aggregated":load_agg_users,
  #       #"top":load_top_users,
  #       #"hover": load_hover_users
    #}
  #},

  #"insurance":{
    #"columns":['year','quarter', 'geo_type', 'geo_name', 'category', 'stat_type', 'count', 'amount'],
    #"loaders":{
    #"aggregated":load_agg_ins,
  #       #"top":load_top_ins,
  #       #"hover":load_hover_ins
     #}
  #}
}

# Supported record classes
report_type = {
    "aggregated":"Aggregated",
   # "hover": "Hover",
   # "top": "Top"
}

def get_abs_fpaths(dir):
    for dirpath,_,filenames in os.walk(dir):
        for f in filenames:
            yield os.path.abspath(os.path.join(dirpath, f))

# Load data
def load_data(root_dir):

  print( '''load_data() - Start''')
  # Dictionary of resulting dataframes
  ret = dict()

  # Get aggregate, top and map for each record type
  for rec_type in record_types.keys():
    ret[rec_type]=dict()

    # Do this for every record class - aggregate, top and map/hover
    for report_type in record_types[rec_type]['loaders'].keys():
      # Set the columns of output dataframe
      columns=record_types[rec_type]['columns']

      # Special treatment for rec_class='aggregate'
      if report_type =='aggregated':
        columns.append('category')

      # Create an output dataframe to store record's from multiple files
      #ret_df = pd.DataFrame(columns)
      retd = dict()
      for c in columns:
        retd[c]=list()

      # Add the output dataframe to the output dictionary of dataframes
      ret[rec_type][report_type]=retd

      # Construct absolute path to folder containing files for
      # a record class and type
      full_path=f'''{root_dir}/{report_type}/{rec_type}'''

      # Decode contents of each file accumulate in a dataframe
      count=0
      for f in get_abs_fpaths(full_path):
        loader = record_types[rec_type]['loaders'][report_type]
        loader(f,report_type,retd)
        count +=1

  print( f'''load_data() {count} files processed - End''')
  return ret

In [None]:
'''
  Test program to load files from disk
'''
root_dir = "/content/drive/MyDrive/Learn/guvi/labs/Assignments/PhonePePulse/phonepe/pulse/data"

#try:
ret = load_data(root_dir)
#print(pd.DataFrame(ret['transaction']['aggregated']).shape)
#print(pd.DataFrame(ret['user']['aggregated']).shape)
#print(pd.DataFrame(ret['insurance']['aggregated']).shape)
print(pd.DataFrame(ret['transaction']['top']).shape)



#except Exception as e:
  #print(e)



load_data() - Start
load_data() 925 files processed - End
(17074, 8)
