Imports

In [15]:
import pandas as pd
import numpy as np
import sqlite3
from datetime import date

In [16]:
#loading_branch_service
df_branch_service = pd.read_json("branch_service_transaction_info.json")

#loading_customer_transaction
df_customer_transaction = pd.read_json("customer_transaction_info.json")

In [17]:
#drop_dups_branch_service
df_branch_service = df_branch_service.drop_duplicates(subset=['txn_id'])

#drop_dups_customer_transaction
df_customer_transaction = df_customer_transaction.drop_duplicates(subset=['txn_id'])

In [18]:
#merge_dataframe
df_merged = pd.merge(df_customer_transaction, df_branch_service)

In [19]:
#fill_branch_name
df_merged['branch_name'] = df_merged.replace('',np.nan).groupby('txn_id')['branch_name'].transform('first')
df_merged['branch_name'] = df_merged['branch_name'].ffill().bfill()

In [20]:
#fill_price
df_merged['price'] = df_merged['price'].fillna(df_merged.groupby(['branch_name','service'])['price'].transform('mean'))

In [21]:
#standardize_last_name
df_merged['last_name'] = df_merged['last_name'].str.replace('\W', '', regex=True)
df_merged['last_name'] = df_merged['last_name'].str.upper()

In [22]:
#standardize_first_name
df_merged['first_name'] = df_merged['first_name'].str.replace('\W', '', regex=True)
df_merged['first_name'] = df_merged['first_name'].str.upper()

In [23]:
#validate_dates
today = str(date.today())
df_merged['avail_date'] = pd.to_datetime(df_merged['avail_date'], format='%Y-%m-%d')
df_merged['birthday'] = pd.to_datetime(df_merged['birthday'], format='%Y-%m-%d')
df_merged = df_merged[(df_merged['avail_date'] <= today) & (df_merged['birthday'] <= today)]
df_merged = df_merged[(df_merged['avail_date'] > df_merged['birthday'])]

In [24]:
#validate_price
df_merged['price'] = df_merged['price'].round(2)

In [25]:
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

In [26]:
dbname = "postgres"
user ="postgres"
password ="1425"
host = "localhost"
port = "5432"

flag = False

conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
conn.autocommit = True

cur = conn.cursor()

cur.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = {}").format(sql.Literal("Transaction")))
exists = cur.fetchone()

if not exists:
    cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier("Transaction")))

conn.commit()

In [27]:
from sqlalchemy import create_engine, text, inspect

engine = create_engine('postgresql://postgres:1425@localhost:5432/Transaction')

inspector = inspect(engine)
table_exists = inspector.has_table('txn')

if not table_exists:
    with engine.connect() as connection:
        connection.execute(text("DROP VIEW IF EXISTS weekly_report CASCADE"))

    df_merged.to_sql('txn', con=engine, if_exists='replace', index=False)
    print("Table 'txn' created.")
else:
    print("Table 'txn' already exists. Skipping creation.")

Table 'txn' already exists. Skipping creation.


In [28]:
dbname2 = "Transaction"

conn2 = psycopg2.connect(dbname=dbname2, user=user, password=password, host=host, port=port)
conn2.autocommit = True

cur2 = conn2.cursor()


cur2.execute("""
CREATE OR REPLACE VIEW weekly_report AS 
SELECT
    EXTRACT(WEEK FROM avail_date) AS week_of_year,
    service,
    ROUND(SUM(price)::numeric, 2) AS weekly_sales
FROM
    txn 
GROUP BY
    week_of_year,
    service 
ORDER BY
    week_of_year ASC, service ASC;
""")


cur2.close()
conn2.close()
cur.close()
conn.close()