In [1]:
import pandas as pd
import mysql.connector


In [2]:
def read_data_from_path(path):
    df = pd.read_json(path, lines=True)
    df = pd.DataFrame.from_records(df['_source'])
    df = df[['Contract','TotalDuration','AppName']]
    return df

In [3]:
def process_category_data(df):
    df['Category'] = df['AppName'].apply(lambda x: 'The thao' if x in ['KPLUS','SPORT']
                                            else('Phim' if x in ['FIMS','VOD']
                                            else('Thu gian' if x == 'RELAX'
                                            else('Cho tre em' if x =='CHILD'
                                            else('Truyen hinh' if x =='CHANNEL'
                                            else('Others'))))))
    return df

In [4]:
def process_summary_data(df):
    df = df[['Contract','TotalDuration','Category']]
    df = pd.pivot_table(df, index='Contract', columns='Category',values='TotalDuration', aggfunc=sum).reset_index()
    df = df.rename_axis(None, axis=1)
    df.fillna(0,inplace=True)
    return df

In [5]:
def import_data_into_database(df):
    cnx = mysql.connector.connect(user='root',
                              password='',
                              host='localhost',
                              database='test_data')
    cursor = cnx.cursor()
    query = """ INSERT INTO user_statistic (`Contract`,`Cho tre em`,`Phim`,`The Thao`, `Thu gian` , `Truyen Hinh`) VALUES (%s,%s,%s,%s,%s,%s)"""
    test = df[0:1000]
    cursor.executemany(query, test.values.tolist())
    return df
    

In [10]:
def main_task():
    print('-------------------')
    print('Read Data From Path')
    path = 'D:\\python-project\\ETL\\20220401.json'
    df = read_data_from_path(path)
    print(df.head(5))
    print('-------------------')
    print('Category Data processing')
    df = process_category_data(df)
    print(df.head(5))
    print('-------------------')
    print('Summary Data processing')
    print('-------------------')
    df = process_summary_data(df)
    print(df.head(5))
    print('Importing Data')
    print('-------------------')
    import_data_into_database(df)
    return print('Task finished')

In [11]:
main_task()

-------------------
Read Data From Path
    Contract  TotalDuration AppName
0  HNH579912            254   KPLUS
1  HUFD40665           1457   KPLUS
2  HNH572635           2318   KPLUS
3  HND141717           1452   KPLUS
4  HNH743103            251   KPLUS
-------------------
Category Data processing
    Contract  TotalDuration AppName  Category
0  HNH579912            254   KPLUS  The thao
1  HUFD40665           1457   KPLUS  The thao
2  HNH572635           2318   KPLUS  The thao
3  HND141717           1452   KPLUS  The thao
4  HNH743103            251   KPLUS  The thao
-------------------
Summary Data processing
-------------------
         Contract  Cho tre em  Phim   The thao  Thu gian  Truyen hinh
0               0         0.0   0.0  3667070.0       0.0  515879796.0
1  113.182.209.48         0.0   0.0        0.0      89.0         63.0
2       AGAAA0335         0.0   0.0        0.0       0.0      11440.0
3       AGAAA0338         0.0   0.0        0.0       0.0       8895.0
4       A