[Prospa]
-------------

How to do it?
-----------------------
Fork this repo, build your ETL process and commit the code with your answers. Open a Pull Request and send us a message highlighting the test is completed.

#### Rules
* it must come with step by step instructions to run the code.
* please, be mindful that your code might be moved or deleted after we analyse the PR. 
* use the best practices
* be able to explain from the ground up the whole process on face to face interview

[Faruque]
-------------

* Please run the Jupyter Notebook (1 The small ETL project.ipynb) . The Notebook contains steps to run the code. 
* Solution contains suitable best practices

[Faruque]
-------------

### Tools and Technologies
Due to PROSPA recommendations following tools and technologies were used

Programming Language : Python 3
Database: Mysql 5.X

For Bonus point SQLITE database were used to generate new dataset

Reporting and Visualization: Jupyter Notebook 

Development and tested OS- Ubuntu 18.04

[Prospa]
-------------

The small ETL project
--------- 

1. The data for this exercise can be found on the `data.zip` file. Can you describe the file format?

[Faruque]
-------------


1. `*.tlb` files contains Pipe(|) Delimeted structured Text file.  One extra pipe was found at the end of each line. Therefore, during data processing that pipe was dropped.

[Prospa]
-------------

**Super Bonus**: generate your own data through the instructions on the encoded file `bonus_etl_data_gen.txt`.
To get the bonus points, please encoded the file with the instructions were used to generate the files.

[Faruque]
-------------


`bonus_etl_data_gen.txt` is an encoded file. `cat  ../bonus_etl_data_gen.txt | base64 --decode` command is used to decrypt. I started to develop the solution on MySQL , to get this extra **Super Bonus** i installed SQLITE as per the istructions and a new dataset was prepared with the scale factor of 0.03

In [1]:
!cat  ../bonus_etl_data_gen.txt | base64 --decode

Use the instruction on https://github.com/lovasoa/TPCH-sqlite to generate your data files.
The data.zip file were generated with scale factor of 0.01
Please, encode your file with the instruction you used to generate the data files.


[Prospa]
-------------

2. Code you scripts to load the data into a database.


[Faruque]
-------------

  `./python/start_etl.py` scripts perform the relevent ETL job

[Prospa]
-------------
3. Design a star schema model which the data should flow.

4. Build your process to load the data into the star schema 

**Bonus** point: 
- add a fields to classify the customer account balance in 3 groups 
- add revenue per line item 
- convert the dates to be distributed over the last 2 years




[Faruque]
-------------
Star schema model is designed and releven Dimension and Fact tables were created. 
`./python/prepare_dim_fact_table.py	` scripts is used to load data into star schema

**Bonus** point: 
- a new table  called CUSTOMER_STATUS is created using following SQL where each customer is classified based on his account balence


``` SELECT C1.C_CUSTKEY, C1.C_ACCTBAL  ,
        CASE 
            WHEN C1.C_ACCTBAL < LOWERQUARTILE     THEN 'LOW'
            WHEN C1.C_ACCTBAL < UPPERQUARTILE    THEN 'MID'
            ELSE 'HIGH'
        END AS C_STATUS
    FROM CUSTOMER C1
    CROSS JOIN (
        SELECT 
        MIN(C_ACCTBAL) ,
        MAX(C_ACCTBAL) ,
        ((MIN(C_ACCTBAL) + AVG(C_ACCTBAL)) / 2) AS LOWERQUARTILE,
        ((MAX(C_ACCTBAL) + AVG(C_ACCTBAL)) / 2) AS UPPERQUARTILE
        FROM CUSTOMER
        ) C2         ```
        
        
-   ORDER_ITEM_FACT table contains revenue per line item, Following logic was used to calculate revenue per line item
``` 
 ( L_QUANTITY *L_EXTENDEDPRICE - L_DISCOUNT) LINEITEM_REVENUE  ,
```
- convert the dates to be distributed over the last 2 years

If I understood the question properly, I need to convert all the date values. If the value is not within the last 2 years (20016 and 2017) then change the year value to 2016 or 2017. The logic I used to complete the task is  if the year value is even then replace the year(yyyy) with 2016 otherwise replace with 2017

example -  1992-12-30  will be converted to 2016-12-30
The logic used to have 2 years data  

```   CASE
        WHEN
            MOD(YEAR(O_ORDERDATE), 2) = 0
        THEN
            STR_TO_DATE(CONCAT('2016', SUBSTRING(O_ORDERDATE, 5, 6)),
                    '%Y-%m-%d')
        ELSE STR_TO_DATE(CONCAT('2017', SUBSTRING(O_ORDERDATE, 5, 6)),
                '%Y-%m-%d')
    END AS ODATE2  
```


[Prospa]
-------------


5. How to schedule this process to run multiple times per day?
 
**Bonus**: What to do if the data arrives in random order and times via streaming?


[Faruque]
-------------

My development platform was Ubuntu 18.04. 
Using cron , the job can be scheduled. If we need to run the job every hour then the cron syntax would be

`0 * * * * python /<scripts path>/python/start_etl.py` 


If the solution is deployed on DCOS then metronome can be used for scheduling the job. Apache Airflow can be another alternative.



This ETL solution is developed for batch procssing and considered there will be no dealy in file landing.  To manages streaming solution current proposed tools are not sufficient. 


Design of the ETL depends on other lots of factors. If , for any reason, csv files arrive late, let say delay is 2 days. then Partition strategy , loading time and extra logic in the data procescing scripts are required. 

To handle Streaming data there are few solutions  available. We can use Flume , Kafka and Spark Streaming to solve streaming data loading.



[Prospa]
-------------


6. How to deploy this code?

**Bonus**: Can you make it to run on a container like process (Docker)? 

[Faruque]
---------

Yes, I can containarize the solution using Docker. I may have a relevent Docker file to share in the presentation.


[Prospa]
-------------

Data Reporting
-------
One of the most important aspects to build a DWH is to deliver insights to end-users. Besides the question bellow, what extra insights you can think of can be generated from this dataset?

Can you using the designed star schema (or if you prefer the raw data), generate SQL statements to answer the following questions:

1. What are the top 5 nations in terms of revenue?

2. From the top 5 nations, what is the most common shipping mode?

3. What are the top selling months?

4. Who are the top customer in terms of revenue and/or quantity?

5. Compare the sales revenue of on current period against previous period?

[Faruque]
-------
### 1. What are the top 5 nations in terms of revenue?

In [2]:
import pandas as pd
import os
import json

import pymysql
with open(os.getcwd()+'/config/config.json') as config_file:
        params = json.load(config_file)

def get_db_connection():
    with open(os.getcwd()+'/config/config.json') as config_file:
        user = params['db_user']
        passw = params['dbpassword']  
        host =  params['hostname']
        port = params['port']
        database=params['databasename']
        conn = pymysql.connect(host,user,passw,database )
        return conn
    
    
conn = get_db_connection()
query = """SELECT C.N_NAME COUNTRY, SUM(OI.LINEITEM_REVENUE)/1000000   'TOTAL_REVENUE(Million)'
FROM CUSTOMER_DIM C, ORDER_ITEM_FACT OI 
WHERE    C.C_CUSTKEY = OI.O_CUSTKEY 
GROUP BY C.N_NAME
ORDER BY 2 DESC
LIMIT 5;"""
df = pd.read_sql(query, conn  )
df

Unnamed: 0,COUNTRY,TOTAL_REVENUE(Million)
0,CANADA,7464.797
1,EGYPT,7287.1797
2,IRAN,7109.8149
3,ALGERIA,6643.4444
4,BRAZIL,6580.5248


### 2. From the top 5 nations, what is the most common shipping mode?

In [3]:
conn = get_db_connection()
query = """ SELECT OI.L_SHIPMODE, COUNT(*) CNT FROM CUSTOMER_DIM C, ORDER_ITEM_FACT OI 
 WHERE C.C_CUSTKEY = OI.O_CUSTKEY 
AND C.N_NAME IN 
(SELECT A.COUNTRY FROM(
SELECT C.N_NAME COUNTRY, SUM(OI.LINEITEM_REVENUE) 
FROM CUSTOMER_DIM C, ORDER_ITEM_FACT OI 
WHERE    C.C_CUSTKEY = OI.O_CUSTKEY 
GROUP BY C.N_NAME
ORDER BY 2 DESC
LIMIT 5) A)
 GROUP BY OI.L_SHIPMODE  
ORDER BY 2; """
df = pd.read_sql(query, conn  )
df

Unnamed: 0,L_SHIPMODE,CNT
0,RAIL,4076
1,TRUCK,4122
2,SHIP,4130
3,REG AIR,4182
4,AIR,4226
5,MAIL,4232
6,FOB,4236


### 3 What are the top selling months?

In [4]:
conn = get_db_connection()
query = """ SELECT    
SUBSTRING(O_ORDERDATE , 1,7) 'YYYY-MM' , SUM(LINEITEM_REVENUE) TOTAL_REVENUE  
FROM ORDER_ITEM_FACT 
GROUP BY SUBSTRING(O_ORDERDATE , 1,4),  SUBSTRING(O_ORDERDATE , 1,7)   
ORDER BY 2 DESC;
 """
df = pd.read_sql(query, conn  )
df

Unnamed: 0,YYYY-MM,TOTAL_REVENUE
0,2016-03,3869116000.0
1,2016-05,3742992000.0
2,2016-01,3730390000.0
3,2016-07,3634146000.0
4,2016-04,3582271000.0
5,2016-06,3551666000.0
6,2016-02,3504858000.0
7,2016-08,3073819000.0
8,2017-12,2943943000.0
9,2017-11,2924315000.0


### 4 Who are the top customer in terms of revenue and/or quantity?
##### TOP CUSTOMER BY REVENUE

In [5]:
conn = get_db_connection()
query = """ SELECT C.C_NAME , SUM(OI.LINEITEM_REVENUE ) TOTAL_REVENUE
FROM CUSTOMER_DIM C, ORDER_ITEM_FACT OI 
WHERE    C.C_CUSTKEY = OI.O_CUSTKEY 
GROUP BY C_NAME
ORDER BY 2 DESC
LIMIT 1
 """
df = pd.read_sql(query, conn  )
df

Unnamed: 0,C_NAME,TOTAL_REVENUE
0,Customer#000001489,387584258.0


##### TOP CUSTOMER BY QUANTITY

In [6]:
conn = get_db_connection()
query = """ SELECT C.C_NAME , SUM(OI.L_QUANTITY ) TOTAL_QTY
FROM CUSTOMER_DIM C, ORDER_ITEM_FACT OI 
WHERE    C.C_CUSTKEY = OI.O_CUSTKEY 
GROUP BY C_NAME
ORDER BY 2 DESC
LIMIT 1
 """
df = pd.read_sql(query, conn  )
df

Unnamed: 0,C_NAME,TOTAL_QTY
0,Customer#000001489,7736.0


### 5 Compare the sales revenue of on current period against previous period?

In [7]:
import pandas as pd
import os
import json

import pymysql
with open(os.getcwd()+'/config/config.json') as config_file:
        params = json.load(config_file)

def get_db_connection():
    with open(os.getcwd()+'/config/config.json') as config_file:
        user = params['db_user']
        passw = params['dbpassword']  
        host =  params['hostname']
        port = params['port']
        database=params['databasename']
        conn = pymysql.connect(host,user,passw,database )
        return conn
    
    
conn = get_db_connection()
query = """ SELECT SUBSTRING(O_ORDERDATE , 1,4) TX_YEAR, SUBSTRING(O_ORDERDATE , 6,2)  TX_MONTH , 
SUM(LINEITEM_REVENUE) total_revenue  FROM ORDER_ITEM_FACT  GROUP BY  SUBSTRING(O_ORDERDATE , 1,4), SUBSTRING(O_ORDERDATE , 6,2)    ORDER BY 1,2;
;"""
ds1 = pd.read_sql(query, conn  )
ds1["total_revenue"] = ds1.total_revenue.astype(float)
df_2016 = ds1.query("TX_YEAR=='2016'")
df_2017= ds1.query("TX_YEAR=='2017'")
df_cd = pd.merge(df_2016, df_2017, how='inner', on = 'TX_MONTH')
df_cd.rename(columns={"TX_YEAR_x": "TX_YEAR", "TX_MONTH": "MONTH", "total_revenue_x": "TOTAL_REVENUE.2016", "TX_YEAR_y": "TX_YEAR","total_revenue_y": "TOTAL_REVENUE.2017"}, errors="raise")

Unnamed: 0,TX_YEAR,MONTH,TOTAL_REVENUE.2016,TX_YEAR.1,TOTAL_REVENUE.2017
0,2016,1,3730390000.0,2017,2685772000.0
1,2016,2,3504858000.0,2017,2641949000.0
2,2016,3,3869116000.0,2017,2600459000.0
3,2016,4,3582271000.0,2017,2659999000.0
4,2016,5,3742992000.0,2017,2892257000.0
5,2016,6,3551666000.0,2017,2610612000.0
6,2016,7,3634146000.0,2017,2649783000.0
7,2016,8,3073819000.0,2017,2632730000.0
8,2016,9,2895060000.0,2017,2726494000.0
9,2016,10,2600950000.0,2017,2890236000.0
