In [1]:
import pandas as pd
import numpy as np
import datetime as dt
from sqlalchemy import create_engine
import pymysql
import os.path

In [161]:
class extract:
    def __init__(self, source_file_name=str):
        self.tableName = 'row_data'
        self.databaseName = 'customer_order_raw'
        self.source_file_name = source_file_name
        self.dataFrame = pd.read_csv(f'./source_system/{self.source_file_name}.csv',
                     parse_dates=['Order Date', 'Ship Date'])
        self.columns = ['row_id', 'order_id', 'order_date', 'ship_date', 'ship_mode', 'customer_id', 'customer_name', 'segment', 'country', 'city',
            'state', 'postal_code', 'region', 'product_id', 'category', 'sub_category', 'product_name', 'sales', 'quantity', 'discount',  'profit']
        self.dataFrame.columns = self.columns
        self.dataFrame = self.dataFrame.groupby(self.columns).count().reset_index()

    def extractData(self):
        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()

        cur_dataFrame = pd.read_sql(f'select * from {self.tableName}', con=dbConnection)
        filter = ~(self.dataFrame['row_id'].isin(cur_dataFrame['row_id']))
        self.dataFrame = self.dataFrame.loc[filter]
        header_flag = True
        if os.path.isfile('./data_lake/customer_order.csv'):
            header_flag = False
        self.dataFrame.to_csv('./data_lake/customer_order.csv', mode = 'a', index=False, header=header_flag)
            

        try:
            self.dataFrame.to_sql(self.tableName, con = sqlEngine, if_exists='append', index=False)
        except ValueError as ex:
            print(ex)
        finally:
            dbConnection.close()

    def getLakeDataFrame(self):
        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()
        frame = pd.read_sql(f'select * from {self.tableName}', con=dbConnection)
        return frame


In [179]:
extract_ins = extract('customer_order2')
extract_ins.extractData()

In [152]:
class transform:
    def formatDateId(self, date):
        year = str(date.year)
        month = str(date.month)
        day = str(date.day)
        if (month.__len__() < 2):
            month = '0' + month
        if (day.__len__() < 2):
            day = '0' + day
        return int(year+month+day)

    def formatDate(self, date):
        date = str(date)
        return dt.date(int(date[0:4]), int(date[4: 6]), int(date[6: 8]))

    def __init__(self):
        self.source_file_name = 'customer_order'
        self.databaseName = 'customer_order'
        df_tmp = pd.read_csv(
            f'./data_lake/{self.source_file_name}.csv', parse_dates=['order_date', 'ship_date'])

        self.tableName_customer = 'dim_customer'
        self.costomer_columns = ['customer_id', 'customer_name', 'segment', 'country', 'city',
                                 'state', 'postal_code', 'region']
        self.df_customer = df_tmp[self.costomer_columns]
        self.df_customer = self.df_customer.groupby(
            self.costomer_columns).count().reset_index()
        self.df_customer['status'] = 'active'

        self.tableName_product = 'dim_product'
        self.product_columns = ['product_id',
                                'category', 'sub_category', 'product_name']
        self.df_product = df_tmp[self.product_columns]
        self.df_product = self.df_product.groupby(
            self.product_columns).count().reset_index()
        self.df_product['status'] = 'active'

        self.tableName_date = 'dim_date'
        self.df_date = pd.DataFrame(
            df_tmp['order_date'].append(df_tmp['ship_date']))
        self.df_date.columns = ['date']
        self.df_date['id'] = self.df_date['date'].copy()
        self.df_date['id'] = self.df_date['id'].apply(self.formatDateId).copy()
        self.df_date = self.df_date.groupby(
            ['id', 'date']).count().reset_index()

        self.tableName_order = 'fact_order'
        self.df_order = df_tmp.copy()
        self.df_order['order_date'] = self.df_order['order_date'].apply(
            self.formatDateId).copy()
        self.df_order['ship_date'] = self.df_order['ship_date'].apply(
            self.formatDateId).copy()

    def getNewDateData(self):
        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()
        dim_date = pd.read_sql(
            f'select * from {self.tableName_date}', con=dbConnection)
        dbConnection.close()
        if dim_date.__len__() == 0:
            return self.df_date
        filter = ~((self.df_date['id']).isin(dim_date['id'].tolist()))
        df_tmp = self.df_date.loc[filter].copy()
        return df_tmp

    def getNewCustomerData(self):
        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()

        dim_customer = pd.read_sql(
            f'select * from {self.tableName_customer}', con=dbConnection)
        dbConnection.close()
        if dim_customer.__len__() == 0:
            return self.df_customer
        dim_customer = dim_customer.set_index('id')
        filter = ~((self.df_customer['customer_id'].isin(dim_customer['customer_id'].tolist())) & (
            self.df_customer['customer_name'].isin(dim_customer['customer_name'].tolist())))
        df_tmp = self.df_customer.loc[filter].copy()
        return df_tmp

    def getNewProductData(self):
        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()
        dim_product = pd.read_sql(
            f'select * from {self.tableName_product}', con=dbConnection)
        dbConnection.close()
        if dim_product.__len__() == 0:
            return self.df_product
        dim_product = dim_product.set_index('id')
        filter = ~((self.df_product['product_id'].isin(dim_product['product_id'].tolist())) & (
            self.df_product['product_name'].isin(dim_product['product_name'].tolist())))
        df_tmp = self.df_product.loc[filter].copy()
        return df_tmp

    def getNewOrderData(self):
        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()
        fact_order = pd.read_sql(
            f'select * from {self.tableName_order}', con=dbConnection)
        dbConnection.close()
        if fact_order.__len__() == 0:
            return self.df_order.drop(self.costomer_columns.__add__(self.product_columns), axis=1)
        filter = ~((self.df_order['row_id']).isin(
            fact_order['row_id'].tolist()))
        df_tmp = self.df_order.loc[filter].copy()
        return df_tmp.drop(self.costomer_columns.__add__(self.product_columns), axis=1)

    def getNewOrderDataSql(self):
        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()
        dim_customer = pd.read_sql(
            sql=f'select * from {self.tableName_customer}', con=dbConnection)
        dim_product = pd.read_sql(
            sql=f'select * from {self.tableName_product}', con=dbConnection)
        df_tmp = self.df_order.copy()
        df_tmp = df_tmp.merge(right=dim_customer,
                              how='left', on=self.costomer_columns)
        self.costomer_columns.append('status')
        df_tmp.drop(self.costomer_columns, axis=1, inplace=True)
        df_tmp.rename({'id': 'customer_id'}, axis=1, inplace=True)

        df_tmp = df_tmp.merge(
            right=dim_product, how='left', on=self.product_columns)
        self.product_columns.append('status')
        df_tmp.drop(self.product_columns, axis=1, inplace=True)
        df_tmp.rename({'id': 'product_id'}, axis=1, inplace=True)

        fact_order = pd.read_sql(
            f'select * from {self.tableName_order}', con=dbConnection)
        dbConnection.close()
        if fact_order.__len__() == 0:
            return df_tmp
        filter = ~((df_tmp['row_id']).isin(
            fact_order['row_id'].tolist()))
        return df_tmp[filter]


In [175]:
class load:
    def __init__(self):
        self.databaseName = 'customer_order'
        self.tableName_order = 'fact_order'
        self.tableName_date = 'dim_date'
        self.tableName_customer = 'dim_customer'
        self.tableName_product = 'dim_product'

        self.metadataDBName = 'metadata'
        self.tableName_logger = 'logger'

    def loadCsv(self):
        transform_ins = transform()

        customer_csv = transform_ins.getNewCustomerData()
        product_csv = transform_ins.getNewProductData()
        order_csv = transform_ins.getNewOrderData()

        header_cus_flag = True
        header_pro_flag = True
        header_ord_flag = True

        if os.path.isfile('./data_warehouse/customer.csv'):
            header_cus_flag = False
        if os.path.isfile('./data_warehouse/product.csv'):
            header_pro_flag = False
        if os.path.isfile('./data_warehouse/order.csv'):
            header_ord_flag = False

        if customer_csv.__len__() != 0:
            customer_csv.to_csv('./data_warehouse/customer.csv',
                                mode='a', index=False, header=header_cus_flag)
        if product_csv.__len__() != 0:
            product_csv.to_csv('./data_warehouse/product.csv',
                               mode='a', index=False, header=header_pro_flag)
        if order_csv.__len__() != 0:
            order_csv.to_csv('./data_warehouse/order.csv',
                             mode='a', index=False, header=header_ord_flag)

    def loadDatabase(self):
        transform_ins = transform()

        sqlEngine = create_engine(
            f'mysql+pymysql://root:root@127.0.0.1/{self.databaseName}')
        dbConnection = sqlEngine.connect()

        sqlEngine_metalog = create_engine(f'mysql+pymysql://root:root@127.0.0.1/{self.metadataDBName}')
        dbConnection_metalog = sqlEngine_metalog.connect()

        append_date = transform_ins.getNewDateData()
        if append_date.__len__() != 0:
            append_date.to_sql(name=self.tableName_date,
                               con=dbConnection, if_exists='append', index=False)
            df_tmp = pd.DataFrame({'detail': ['Load dữ liệu ngày tháng vào dim_date'], 'number_row_affected': [append_date.__len__()]})
            df_tmp.to_sql(name=self.tableName_logger, con=dbConnection_metalog, index=False, if_exists='append')
        append_customer = transform_ins.getNewCustomerData()
        if append_customer.__len__() != 0:
            append_customer.to_sql(
                name=self.tableName_customer, con=dbConnection, if_exists='append', index=False)
            df_tmp = pd.DataFrame({'detail': ['Load dữ liệu ngày tháng vào dim_customer'], 'number_row_affected': [append_customer.__len__()]})
            df_tmp.to_sql(name=self.tableName_logger, con=dbConnection_metalog, index=False, if_exists='append')
        append_product = transform_ins.getNewProductData()
        if append_product.__len__() != 0:
            append_product.to_sql(
                name=self.tableName_product, con=dbConnection, if_exists='append', index=False)
            df_tmp = pd.DataFrame({'detail': ['Load dữ liệu ngày tháng vào dim_product'], 'number_row_affected': [append_product.__len__()]})
            df_tmp.to_sql(name=self.tableName_logger, con=dbConnection_metalog, index=False, if_exists='append')
        append_order = transform_ins.getNewOrderDataSql()
        if append_order.__len__() != 0:
            append_order.to_sql(name=self.tableName_order,
                                con=dbConnection, if_exists='append', index=False)
            df_tmp = pd.DataFrame({'detail': ['Load dữ liệu ngày tháng vào fact_order'], 'number_row_affected': [append_order.__len__()]})
            df_tmp.to_sql(name=self.tableName_logger, con=dbConnection_metalog, index=False, if_exists='append')
        dbConnection.close()
        dbConnection_metalog.close()


In [178]:
load_ins = load()
load_ins.loadCsv()
load_ins.loadDatabase()

In [2]:
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
df.to_json('./test_json.json')

In [None]:
df1 = pd.read_json('./test_json.json')
df1.head()