In [None]:
from pathlib import Path
import pandas as pd
import json
import datetime

# Aggregated Data Transactions

In [None]:
def Data_Transactions():
    root_dir = Path("../data/aggregated/transaction/country/india/state/")
    lis = []
    # rglob is recursive glob - which means searching files recursiverly inside the files and folders
    for i in root_dir.rglob(f"*.json"):
        with open(i, 'r') as file:
            content = file.read()
            dataset = json.loads(content)

            # i will have exact path from that <.parent> is going one step behind and <.name> is getting that subdirectory name
            state_name = i.parent.parent.name
            year = i.parent.name

            # to get the filename 1.json,2.json, 3.json and 4.json 
            quarter = i.stem
            # to change 1.json to Q1, 2.json to Q2 etc
            quarter = f'Q{quarter}'

            lis.append({'quarter':quarter,'year':year,'state':state_name,'data':dataset})

    # print(lis)

    lis2=[]
    for j in lis:
        
        for k in j['data']['data']['transactionData']:
            # I'm doing conversion rounding to two decimal point
            amount_value = k['paymentInstruments'][0]['amount']
            amount_formatted = f'{amount_value: .2f}'

            timestamp_milliseconds = j['data']['responseTimestamp']
            timestamp_seconds = timestamp_milliseconds /1000
            datetime_obj = datetime.datetime.fromtimestamp(timestamp_seconds)
            formatted_timestamp = datetime_obj.strftime('%Y-%m-%d %H:%M:%S')

            
            data = dict(quarter = j['quarter'],
                        year = j['year'],
                        state = j['state'],
                        name = k['name'],
                        type = k['paymentInstruments'][0]['type'],
                        count = k['paymentInstruments'][0]['count'],
                        amount = amount_formatted,
                        timestamp = formatted_timestamp)
            lis2.append(data)

    return lis2
    

In [128]:
dataTransaction = pd.DataFrame(Data_Transactions())
dataTransaction['year'] = dataTransaction['year'].astype(int)
dataTransaction['amount'] = dataTransaction['amount'].astype(float)
dataTransaction

Unnamed: 0,quarter,year,state,name,type,count,amount,timestamp
0,Q1,2018,andaman-&-nicobar-islands,Recharge & bill payments,TOTAL,4200,1.845307e+06,2021-09-01 18:34:47
1,Q1,2018,andaman-&-nicobar-islands,Peer-to-peer payments,TOTAL,1871,1.213866e+07,2021-09-01 18:34:47
2,Q1,2018,andaman-&-nicobar-islands,Merchant payments,TOTAL,298,4.525072e+05,2021-09-01 18:34:47
3,Q1,2018,andaman-&-nicobar-islands,Financial Services,TOTAL,33,1.060142e+04,2021-09-01 18:34:47
4,Q1,2018,andaman-&-nicobar-islands,Others,TOTAL,256,1.846899e+05,2021-09-01 18:34:47
...,...,...,...,...,...,...,...,...
3949,Q2,2023,west-bengal,Merchant payments,TOTAL,245111000,1.767046e+11,2023-08-21 17:30:57
3950,Q2,2023,west-bengal,Peer-to-peer payments,TOTAL,240347041,7.970548e+11,2023-08-21 17:30:57
3951,Q2,2023,west-bengal,Recharge & bill payments,TOTAL,58950434,3.478924e+10,2023-08-21 17:30:57
3952,Q2,2023,west-bengal,Financial Services,TOTAL,327537,3.174670e+08,2023-08-21 17:30:57


In [None]:
import mysql.connector
from tabulate import tabulate

In [113]:
db = mysql.connector.connect(
    host ='localhost',
    user = 'root',
    password ='balaji',
    database = 'pulse'
)

mycursor = db.cursor(buffered=True)
mycursor
db.commit()

In [None]:
mycursor.execute("create database if not exists pulse")

In [None]:
mycursor.execute("show databases")

out = mycursor.fetchall()
print(tabulate(out, [i[0] for i in mycursor.description], tablefmt='psql'))

In [None]:
mycursor.execute("""create table if not exists aggDataTrans (
                 aggDataTransId int auto_increment primary key,quarter varchar(10), year int(10), 
                 state varchar(255), name varchar(255), type varchar(10), count int(255), amount float,
                 timestamp datetime)
                 """)

In [None]:
sql = ("""insert into aggDataTrans(aggDataTransId ,quarter ,year ,state ,name, type, count, amount, 
        timestamp) values(%s,%s,%s,%s,%s,%s,%s,%s,%s)
        on duplicate key update
        quarter = values(quarter), year = values(year), state = values(state), name = values(name), 
        type = values(type), count = values(count), amount = values(amount),
        timestamp = values(timestamp)""")

for i in dataTransaction.to_records().tolist():
    mycursor.execute(sql,i)

In [None]:
db.commit()

In [None]:
mycursor.execute("select * from aggdatatrans limit 20")

out = mycursor.fetchall()
print(tabulate(out, [i[0] for i in mycursor.description], tablefmt='psql'))

In [None]:
mycursor.execute("show tables")

out = mycursor.fetchall()
print(tabulate(out, [i[0] for i in mycursor.description], tablefmt='psql'))

# Aggregated Data Users

In [None]:
def Data_Users():
    root_dir1 = Path("../data/aggregated/user/country/india/state")

    lis = []
    for i in root_dir1.rglob("*.json"):
        # print(i)
        with open(i, 'r') as file:
            content = file.read()
            dataset = json.loads(content)

            state = i.parent.parent.name
            year = i.parent.name
            #  to get 1.josn,2.json,3.json and 4.json
            quarter = i.stem
            #  to convert 1.json to Q1, 2.json to Q2 etc.,
            quarter = f'Q{quarter}'

            lis.append({'quarter':quarter, 'year': year, 'state' : state, 'data': dataset})

    # print(lis)

    lis2 = []
    for j in lis:
        users_by_device = j['data']['data']['usersByDevice']
        if users_by_device is None:
            continue  # this will skip if there is no data
        
        for k in users_by_device:
            # percentage format
            percentage_value = k.get('percentage', None)
            percentage_formated = f'{percentage_value: .2f}'

            data = dict(
                        quarter = j['quarter'],
                        year = j['year'],
                        state =j['state'],
                        registeredUsers = j['data']['data']['aggregated'].get('registeredUsers', None),
                        appOpens = j['data']['data']['aggregated'].get('appOpens', None),
                        brand = k.get('brand', None),
                        count = k.get('count', None),
                        percentage = percentage_formated,
                        timestamp = j['data']['responseTimestamp']
                        )
            lis2.append(data)

    return lis2


In [None]:
DataUsers = pd.DataFrame(Data_Users())
DataUsers['year'] = DataUsers['year'].astype(int)
DataUsers['percentage'] = DataUsers['percentage'].astype(float)
DataUsers['timestamp'] = pd.to_datetime(DataUsers['timestamp'], unit='ms')
DataUsers['timestamp'] = DataUsers['timestamp'].dt.strftime("%Y-%m-%d %H:%M:%S")
DataUsers

In [119]:
DataUsers.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6732 entries, 0 to 6731
Data columns (total 9 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   quarter          6732 non-null   object 
 1   year             6732 non-null   int32  
 2   state            6732 non-null   object 
 3   registeredUsers  6732 non-null   int64  
 4   appOpens         6732 non-null   int64  
 5   brand            6732 non-null   object 
 6   count            6732 non-null   int64  
 7   percentage       6732 non-null   float64
 8   timestamp        6732 non-null   object 
dtypes: float64(1), int32(1), int64(3), object(4)
memory usage: 447.2+ KB


In [122]:
# mycursor.execute("drop table aggDataUsers")

In [123]:
mycursor.execute("""create table if not exists aggDataUsers (
                 aggDataUsersId int auto_increment primary key, quarter varchar(10), year int(10), 
                 state varchar(255), registeredUsers int(255), appOpens bigint, brand varchar(255),
                 count int(255), percentage float, timestamp datetime)""")

In [124]:
mycursor.execute("show tables")

out = mycursor.fetchall()
print(tabulate(out, [i[0] for i in mycursor.description], tablefmt='psql'))

+-------------------+
| Tables_in_pulse   |
|-------------------|
| aggdatatrans      |
| aggdatausers      |
+-------------------+


In [125]:
sql = ("""insert into aggdatausers (aggDataUsersId, quarter, year, state, registeredUsers, appOpens, brand
       , count, percentage, timestamp) values(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
       on duplicate key update
       quarter = values(quarter), year = values(year), state = values(state), 
       registeredUsers = values(registeredUsers), appOpens = values(appOpens), brand = values(brand),
       count = values(count), percentage = values(percentage), timestamp = values(timestamp)""")

for i in DataUsers.to_records().tolist():
    mycursor.execute(sql,i)

In [126]:
db.commit()

In [127]:
mycursor.execute("select * from aggdatausers limit 10")

out = mycursor.fetchall()
print(tabulate(out, [i[0] for i in mycursor.description], tablefmt='psql'))

+------------------+-----------+--------+---------------------------+-------------------+------------+----------+---------+--------------+---------------------+
|   aggDataUsersId | quarter   |   year | state                     |   registeredUsers |   appOpens | brand    |   count |   percentage | timestamp           |
|------------------+-----------+--------+---------------------------+-------------------+------------+----------+---------+--------------+---------------------|
|                1 | Q1        |   2018 | andaman-&-nicobar-islands |              6740 |          0 | Samsung  |    1445 |         0.21 | 2021-09-01 13:04:54 |
|                2 | Q1        |   2018 | andaman-&-nicobar-islands |              6740 |          0 | Vivo     |     982 |         0.15 | 2021-09-01 13:04:54 |
|                3 | Q1        |   2018 | andaman-&-nicobar-islands |              6740 |          0 | Oppo     |     501 |         0.07 | 2021-09-01 13:04:54 |
|                4 | Q1        |  