In [107]:
# import depenendencies
from google.cloud import bigquery
import pandas as pd

In [108]:
# project id
project_id = 'dataengongcp'

In [109]:
# instantiate bigquery client
client = bigquery.Client()

In [110]:

# read, format and save csv to parquet
df_sales_15 = pd.read_csv('gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2015.csv')


In [113]:
df_ord_qty = df_sales_15['ProductKey'].unique()
df_ord_qty

array([332, 312, 350, 338, 310, 314, 345, 313, 351, 344, 326, 348, 311,
       324, 342, 349, 346, 328, 334, 347, 320, 330, 322, 340, 336, 371,
       373, 354, 362, 385, 358, 377, 381, 379, 369, 370, 368, 352, 375,
       387, 360, 356, 383, 389], dtype=int64)

In [115]:
# uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
# pd.read_csv(uri)

In [41]:
def format_and_to_parquet(csv_uri, parquet_uri):
    df = pd.read_csv(csv_uri)
    date_cols = ['OrderDate', 'StockDate']
    for col in date_cols:
        df[col] = df[col].str.replace('/', '-')
        df[col] = pd.to_datetime(df[col])
    df.to_parquet(parquet_uri, compression='snappy', index=False)

In [57]:
def format_and_to_csv(csv_uri, csv_formatted_uri):
    df = pd.read_csv(csv_uri)
    date_cols = ['OrderDate', 'StockDate']
    for col in date_cols:
        df[col] = df[col].str.replace('/', '-')
        df[col] = pd.to_datetime(df[col])
    df.to_csv(f'gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_{csv_formatted_uri}_2015.csv', index=False)

In [58]:
csv_uri = 'gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2015.csv'
parquet_uri = 'gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2015.parquet'
csv_formatted_uri = 'formatted'

In [71]:
format_and_to_csv(csv_uri, csv_formatted_uri)

In [72]:
!gsutil ls -r gs://lpkadvworks/data/AW_Sales

gs://lpkadvworks/data/AW_Sales/:
gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2015.csv
gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2015.parquet
gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2015.snappy.parquet
gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2016.csv
gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2017.csv
gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_formatted_2015.csv


In [122]:
sales_2015_formatted_df = pd.read_csv('gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_formatted_2015.csv')

In [128]:

date_cols = ['OrderDate', 'StockDate']
for col in date_cols:
    sales_2015_formatted_df[col] = sales_2015_formatted_df[col].str.replace('/', '-')
    sales_2015_formatted_df[col] = pd.to_datetime(sales_2015_formatted_df[col])

In [129]:
sales_2015_formatted_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2630 entries, 0 to 2629
Data columns (total 8 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   OrderDate      2630 non-null   datetime64[ns]
 1   StockDate      2630 non-null   datetime64[ns]
 2   OrderNumber    2630 non-null   object        
 3   ProductKey     2630 non-null   int64         
 4   CustomerKey    2630 non-null   int64         
 5   TerritoryKey   2630 non-null   int64         
 6   OrderLineItem  2630 non-null   int64         
 7   OrderQuantity  2630 non-null   int64         
dtypes: datetime64[ns](2), int64(5), object(1)
memory usage: 164.5+ KB


In [138]:
sales_2015_formatted_df.to_gbq(f'{project_id}.advworks.stage_sales', project_id)

In [153]:
def format_to_gbq(csv_uri, project_id, gbq_table):
    df = pd.read_csv(csv_uri)
    date_cols = ['OrderDate', 'StockDate']
    for col in date_cols:
        df[col] = df[col].str.replace('/', '-')
        df[col] = pd.to_datetime(df[col])
    df.to_gbq(gbq_table, project_id, if_exists='replace')

In [142]:
sales_2015_uri = 'gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2015.csv'
gbq_table_id = f'{project_id}.advworks.stage_sales'

In [144]:
# create sales table
schema=[
        bigquery.SchemaField('OrderDate', 'TIMESTAMP'),
        bigquery.SchemaField('StockDate', 'TIMESTAMP'),
        bigquery.SchemaField('OrderNumber', 'STRING'),
        bigquery.SchemaField('ProductKey', 'INTEGER'),
        bigquery.SchemaField('CustomerKey', 'INTEGER'),
        bigquery.SchemaField('TerritoryKey', 'INTEGER'),
        bigquery.SchemaField('OrderLineItem', 'INTEGER'),
        bigquery.SchemaField('OrderQuantity', 'INTEGER'),
    ]
table_id = 'dataengongcp.advworks.sales'
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table dataengongcp.advworks.sales


<google.cloud.bigquery.table._EmptyRowIterator at 0x29b35beb100>

In [156]:
# function to upsert data from staging table to main table
def merge_stage_into_sales():
    # merge into sales table
    merge_query = f'''
        MERGE INTO `{project_id}.advworks.sales` AS s
        USING `{project_id}.advworks.stage_sales` AS ss
        ON s.OrderNumber = ss.OrderNumber
        WHEN MATCHED THEN
            UPDATE SET s.OrderDate = ss.OrderDate,
                        s.StockDate = ss.StockDate,
                        s.ProductKey = ss.ProductKey,
                        s.CustomerKey = ss.CustomerKey,
                        s.TerritoryKey = ss.TerritoryKey,
                        s.OrderLineItem = ss.OrderLineItem,
                        s.OrderQuantity = ss.OrderQuantity
        WHEN NOT MATCHED THEN
            INSERT (OrderDate, StockDate, OrderNumber, ProductKey, CustomerKey, TerritoryKey, OrderLineItem, OrderQuantity)
            VALUES (ss.OrderDate, ss.StockDate, ss.OrderNumber, ss.ProductKey, ss.CustomerKey, ss.TerritoryKey, ss.OrderLineItem, ss.OrderQuantity)
    '''
    job_config = bigquery.QueryJobConfig()
    job_config.priority = bigquery.QueryPriority.BATCH
    query_job = client.query(merge_query, job_config=job_config)
    query_job.result()

In [152]:
# format_to_gbq(sales_2015_uri, project_id, gbq_table_id)
df_16 = pd.read_csv('gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2016.csv')
df_16.head()

Unnamed: 0,OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
0,1/1/2016,10/17/2002,SO48797,385,14335,1,1,1
1,1/1/2016,9/30/2002,SO48802,383,24923,9,1,1
2,1/1/2016,11/29/2002,SO48801,326,15493,1,1,1
3,1/1/2016,11/16/2002,SO48799,352,26708,4,1,1
4,1/1/2016,12/16/2002,SO48798,369,23332,9,1,1


In [154]:
sales_2016_uri = 'gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_2016.csv'
gbq_table_id = f'{project_id}.advworks.stage_sales'
format_to_gbq(sales_2016_uri, project_id, gbq_table_id)

In [157]:
merge_stage_into_sales()

In [158]:
def print_dataset_details(project_id):
    datasets = list(client.list_datasets())
    counter = 0
    table_counter = 0
    print(f'There are {len(datasets)} datasets in {project_id.upper()} project')
    for dataset in datasets:
        counter += 1
        print(f'dataset {counter}: {dataset.dataset_id}')
        print('............................................')
        tables = list(client.list_tables(dataset))
        print(f'There are {len(tables)} tables in {dataset.dataset_id} dataset')
        for table in tables:
            table_counter += 1
            print(f'table {table_counter}: {table.table_id}')

In [None]:
# perform test query
QUERY = (
    f'SELECT * FROM `{project_id}.retail.orders`'
    'LIMIT 10'
)
query_job = client.query(QUERY)
rows = query_job.result()

In [None]:
for row in rows:
    print(row.values())

In [None]:
# create new dataset advworks
advworks_dataset = bigquery.Dataset(f'{project_id}.advworks')
advworks_dataset.location = 'US'
advworks_dataset = client.create_dataset(advworks_dataset, timeout=30)

In [159]:
print_dataset_details(project_id)

There are 3 datasets in DATAENGONGCP project
dataset 1: advworks
............................................
There are 2 tables in advworks dataset
table 1: sales
table 2: stage_sales
dataset 2: retail
............................................
There are 3 tables in retail dataset
table 3: daily_product_revenue
table 4: order_items
table 5: orders
dataset 3: sms_db
............................................
There are 3 tables in sms_db dataset
table 6: users
table 7: users_duplicate
table 8: users_stg


In [84]:
# create tables
table_id = f'{project_id}.advworks.sales'

jobs_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField('OrderDate', 'STRING'),
        bigquery.SchemaField('StockDate', 'STRING'),
        bigquery.SchemaField('OrderNumber', 'STRING'),
        bigquery.SchemaField('ProductKey', 'STRING'),
        bigquery.SchemaField('CustomerKey', 'STRING'),
        bigquery.SchemaField('TerritoryKey', 'STRING'),
        bigquery.SchemaField('OrderLineItem', 'STRING'),
        bigquery.SchemaField('OrderQuantity', 'STRING'),
    ]
)

# job_config = bigquery.LoadJobConfig(
#     source_format=bigquery.SourceFormat.PARQUET,
# )
file_uri = 'gs://lpkadvworks/data/AW_Sales/AdventureWorks_Sales_formatted_2015.csv'

In [85]:
load_job = client.load_table_from_uri(
    file_uri, table_id, job_config=jobs_config
)
load_job.result()

LoadJob<project=dataengongcp, location=US, id=30fa7a06-eaaf-4bb9-872f-575bb43ceae4>

In [86]:
sales_table = client.get_table(table_id)
print(f'Loaded {sales_table.num_rows} rows to table {table_id}')

Loaded 2631 rows to table dataengongcp.advworks.sales


In [None]:
# validate by reading table from bigquery using pandas



In [100]:
query = f'''
    SELECT *
    FROM `{project_id}.advworks.sales`
'''
df = pd.read_gbq(query, project_id=project_id)
df.head()

Unnamed: 0,OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
0,2015-01-04,2001-09-15,SO45098,310,29167,1,1,1
1,2015-01-06,2001-09-24,SO45103,310,29140,1,1,1
2,2015-01-08,2001-11-01,SO45114,310,29146,1,1,1
3,2015-01-09,2001-09-24,SO45121,310,29169,1,1,1
4,2015-01-14,2001-12-25,SO45152,310,29276,1,1,1


In [101]:
df['OrderQuantity'].value_counts()

1                2630
OrderQuantity       1
Name: OrderQuantity, dtype: int64

In [102]:
df.loc[df['OrderQuantity'] == 'OrderQuantity']

Unnamed: 0,OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
2630,OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity


In [97]:
query = f'''
    SELECT ProductKey,
        COUNT(ProductKey) as NumOrders
    FROM `{project_id}.advworks.sales`
    GROUP BY 1
    ORDER BY COUNT(OrderNumber) DESC
'''
top_date_quantity_df = pd.read_gbq(query, project_id=project_id)


Unnamed: 0,OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
0,2015-01-04,2001-09-15,SO45098,310,29167,1,1,1
1,2015-01-06,2001-09-24,SO45103,310,29140,1,1,1
2,2015-01-08,2001-11-01,SO45114,310,29146,1,1,1
3,2015-01-09,2001-09-24,SO45121,310,29169,1,1,1
4,2015-01-14,2001-12-25,SO45152,310,29276,1,1,1


In [99]:
top_date_quantity_df.tail()

Unnamed: 0,ProductKey,NumOrders
40,322,24
41,330,24
42,346,24
43,347,22
44,ProductKey,1
