# Create csv file for sales, purchases and inventory of products

## Importing libraries

In [1]:
import datalabframework as dlf
import pandas as pd
from pyspark.sql import Window
from pyspark.sql import functions as F

## Starting engine and loading fact table

In [2]:
def start_engine():
    project = dlf.project.load()
    return project.engine()

In [3]:
engine = start_engine()

In [4]:
df = engine.load("fact_table")

INFO:dlf:{'md': {'hash': '0x1c79e1ac5aa3d533', 'url': 'hdfs://bigdata-m.teko.vn:18020/teko/prod/etl/fact/fact_table', 'service': 'hdfs', 'format': 'parquet', 'host': 'bigdata-m.teko.vn', 'port': 18020, 'driver': None, 'database': None, 'username': None, 'password': None, 'resource_path': 'fact_table', 'provider_path': '/teko/prod/etl/fact', 'provider_alias': 'fact', 'resource_alias': 'fact_table', 'cache': None, 'date_column': None, 'date_start': None, 'date_end': None, 'date_window': None, 'date_partition': None, 'update_column': None, 'hash_column': None, 'state_column': None, 'options': {}, 'mapping': {}}, 'mode': None, 'records': 5401350, 'columns': 96, 'time': 14.03477175347507, 'time_core': 7.699648783542216, 'time_prep': 6.335121406242251}


## Selecting transactions and fields

In [5]:
transactions = df.select('transaction_date', 'sku_id', 'sku_name', 'doc_type',
                         'quantity', 'warehouse_id', 'end_day_quantity') 

In [6]:
transactions.show(10)

+----------------+-------+--------------------+--------+--------+------------+----------------+
|transaction_date| sku_id|            sku_name|doc_type|quantity|warehouse_id|end_day_quantity|
+----------------+-------+--------------------+--------+--------+------------+----------------+
|      2019-02-28|1206838|phí dịch vụ giao ...|     PTX|  1.0000|      090016|        335.0000|
|      2019-02-28|1809570|smart tivi lg 55 ...|     PTX| 16.0000|      090016|          0.0000|
|      2019-02-28|1808441|khung treo nghiên...|     PTX| 16.0000|      090016|          0.0000|
|      2019-02-28|1810247|máy tính xách tay...|     PTX|  1.0000|        0201|          0.0000|
|      2019-02-28|1704995|chuột fuhlen l102...|     PTX|  5.0000|      091001|          0.0000|
|      2019-02-28|1206838|phí dịch vụ giao ...|     PTX|  1.0000|      091001|       7736.0000|
|      2019-02-28|1703670|bàn phím cơ dareu...|     PTX|  1.0000|        0601|          8.0000|
|      2019-02-28|1703944|chuột máy tính

## Listing transaction date

In [7]:
from datetime import date

In [8]:
transaction_date = transactions.select('transaction_date').distinct().orderBy('transaction_date').collect()
columns = [row.transaction_date for row in transaction_date]

## Creating sales products pivot table

### Selecting sale transactions

In [9]:
sales = transactions.select('transaction_date', 'sku_id', 'quantity').filter((transactions.doc_type == 'PTX') | (transactions.doc_type == 'HDF')) \
.groupby('transaction_date', 'sku_id').agg(F.sum('quantity').alias('sell_quantity'))

In [10]:
sales.orderBy('transaction_date').show(20)

+----------------+-------+-------------+
|transaction_date| sku_id|sell_quantity|
+----------------+-------+-------------+
|      2017-01-01|1601431|       1.0000|
|      2017-01-01|1603841|       1.0000|
|      2017-01-01|1402753|       1.0000|
|      2017-01-01|1602867|       3.0000|
|      2017-01-01|1302572|      27.0000|
|      2017-01-01|1201142|       1.0000|
|      2017-01-01|1601555|       5.0000|
|      2017-01-01|1601619|       2.0000|
|      2017-01-01|1203604|       1.0000|
|      2017-01-01|1502689|       1.0000|
|      2017-01-01|1200869|       1.0000|
|      2017-01-01|1200870|       1.0000|
|      2017-01-01|1601780|      26.0000|
|      2017-01-01|1402437|       1.0000|
|      2017-01-01|1601291|       2.0000|
|      2017-01-01|1601632|       1.0000|
|      2017-01-01|1600817|       1.0000|
|      2017-01-01|1603017|       1.0000|
|      2017-01-01|1207133|       3.0000|
|      2017-01-01|1204495|       3.0000|
+----------------+-------+-------------+
only showing top

In [12]:
sales.select('sku_id', 'sell_quantity').filter((sales.transaction_date == '2018-09-30') & (sales.sku_id == 1807457)).show()

+-------+-------------+
| sku_id|sell_quantity|
+-------+-------------+
|1807457|       9.0000|
+-------+-------------+



### Creating pivot table by transaction_date

In [13]:
sales_df = sales.groupBy('sku_id').pivot('transaction_date', columns).sum('sell_quantity').orderBy('sku_id')

In [14]:
sales_df = sales_df.fillna(0)

In [15]:
sales_df.select('sku_id', '2018-06-01', '2018-06-02').show()

+-------+----------+----------+
| sku_id|2018-06-01|2018-06-02|
+-------+----------+----------+
|1200010|    0.0000|    0.0000|
|1200108|    0.0000|    0.0000|
|1200109|    0.0000|    4.0000|
|1200110|    0.0000|    1.0000|
|1200111|    0.0000|    0.0000|
|1200112|    0.0000|    0.0000|
|1200113|    3.0000|    5.0000|
|1200114|    1.0000|    0.0000|
|1200118|    0.0000|    0.0000|
|1200119|    4.0000|    2.0000|
|1200122|    2.0000|    3.0000|
|1200124|    0.0000|    0.0000|
|1200125|    0.0000|    0.0000|
|1200131|    0.0000|    0.0000|
|1200132|    0.0000|    1.0000|
|1200133|    0.0000|    0.0000|
|1200135|    0.0000|    0.0000|
|1200137|    0.0000|    0.0000|
|1200138|    1.0000|    1.0000|
|1200140|    0.0000|    0.0000|
+-------+----------+----------+
only showing top 20 rows



### Saving to file

In [16]:
sales_df.toPandas().to_csv('sales_1.csv', index = False)

## Creating purchases products pivot table

### Selecting purchase transactions

In [15]:
purchases = transactions.select('transaction_date', 'sku_id', 'quantity').filter(transactions.doc_type == 'PNA') \
.groupby('transaction_date', 'sku_id').agg(F.sum('quantity').alias('buy_quantity'))

In [16]:
purchases.orderBy('transaction_date').show(20)

+----------------+-------+------------+
|transaction_date| sku_id|buy_quantity|
+----------------+-------+------------+
|      2017-01-01|1204071|     30.0000|
|      2017-01-01|1500184|      8.0000|
|      2017-01-01|1204075|      1.0000|
|      2017-01-01|1204077|     13.0000|
|      2017-01-01|1204070|     96.0000|
|      2017-01-01|1300888|      1.0000|
|      2017-01-01|1301401|     68.0000|
|      2017-01-01|1300886|    131.0000|
|      2017-01-01|1204072|      1.0000|
|      2017-01-02|1203021|      5.0000|
|      2017-01-02|1301199|      1.0000|
|      2017-01-02|1404738|      5.0000|
|      2017-01-02|1203128|      2.0000|
|      2017-01-02|1503187|      2.0000|
|      2017-01-02|1302625|      2.0000|
|      2017-01-02|1601032|      2.0000|
|      2017-01-02|1203022|      3.0000|
|      2017-01-02|1602156|      2.0000|
|      2017-01-02|1400700|      5.0000|
|      2017-01-02|1203052|      5.0000|
+----------------+-------+------------+
only showing top 20 rows



### Creating pivot table

In [17]:
purchases_df = purchases.groupBy('sku_id').pivot('transaction_date', columns).sum('buy_quantity').orderBy('sku_id')

In [18]:
purchases_df = purchases_df.fillna(0)

In [19]:
purchases_df.select('sku_id', '2018-06-01', '2018-06-02').show()

+----------+----------+
|2018-06-01|2018-06-02|
+----------+----------+
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|   10.0000|    0.0000|
|    5.0000|    0.0000|
|    0.0000|    0.0000|
|    3.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    5.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
|    0.0000|    0.0000|
+----------+----------+
only showing top 20 rows



### Saving to file

In [20]:
purchases_df.toPandas().to_csv('purchases.csv', index = False)

## Creating inventory pivot table

### Dropping duplicate transactions of sku_id in same day

In [20]:
transactions_distinct = transactions.select('transaction_date', 'sku_id', 'warehouse_id', 'end_day_quantity').distinct().orderBy('transaction_date')

In [21]:
transactions_distinct.count()

3008254

### Calculating number of inventory of each sku_id by day

In [22]:
def num_inventory(date):
    date = pd.to_datetime(date).date()
    transactions_before_date = transactions_distinct.filter(F.col('transaction_date') <= date)
    windowSpec = Window.partitionBy('warehouse_id', 'sku_id').orderBy(F.desc('transaction_date'))
    tmp_df = transactions_before_date.select('sku_id', 'warehouse_id', 'end_day_quantity', F.rank().over(windowSpec).alias('rank')).where('rank == 1')
    res = tmp_df.groupby('sku_id').agg(F.sum('end_day_quantity').alias('num_inventory'))
    res = res.withColumn('transaction_date', F.lit(date))
    return res.select('transaction_date', 'sku_id', 'num_inventory')

#### Testing for a specific date

In [47]:
num_inventory('2017-01-01').count()

838

### Creating an empty dataframe to append data of each day

In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()

In [20]:
from pyspark.sql.types import *
sc = spark.sparkContext

field = [StructField("transaction_date", TimestampType(), True), StructField("sku_id", IntegerType(), True), StructField("num_inventory", DoubleType(), True)]
schema = StructType(field)

inventory_df = spark.createDataFrame(sc.emptyRDD(), schema)

### Iterating to calculate number of inventory of each day

In [23]:
for row in transaction_date:
    if row.transaction_date > date(2018, 12, 22):
        num_inventory(row.transaction_date).toPandas().to_csv('inventory_df_append.csv', index = False, header = False, mode = 'a')
    if row.transaction_date.day == 1:
        print(row.transaction_date)

2017-01-01
2017-02-01
2017-03-01
2017-04-01
2017-05-01
2017-06-01
2017-07-01
2017-08-01
2017-09-01
2017-10-01
2017-11-01
2017-12-01
2018-01-01
2018-02-01
2018-03-01
2018-04-01
2018-05-01
2018-06-01
2018-07-01
2018-08-01
2018-09-01
2018-10-01
2018-11-01
2018-12-01
2019-01-01
2019-02-01
2019-03-01


In [18]:
from datetime import date
transaction_date[0].transaction_date <= date(2017, 1, 20)

True

### Creating a pivot table

In [24]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()

In [62]:
inventory_pd_df = pd.read_csv('inventory_df_append.csv')

In [63]:
import numpy as np

In [64]:
inventory_df_pivot = inventory_pd_df.pivot_table(values = 'num_inventory', index = ['sku_id'], columns = ['transaction_date'], aggfunc=np.sum, fill_value=0)

In [65]:
inventory_df_pivot.columns.name = None

In [66]:
inventory_df_pivot = inventory_df_pivot.reset_index()

In [67]:
inventory_df_pivot.iloc[:10, :10]

Unnamed: 0,sku_id,2017-01-01,2017-01-02,2017-01-03,2017-01-04,2017-01-05,2017-01-06,2017-01-07,2017-01-08,2017-01-09
0,1200010,0,0,0,0,0,0,0,0,0
1,1200108,0,0,0,0,1,1,1,1,1
2,1200109,0,7,7,7,14,14,13,11,14
3,1200110,0,0,4,4,6,6,6,6,8
4,1200111,0,0,0,0,0,0,0,0,0
5,1200112,0,0,0,0,0,0,0,0,0
6,1200113,0,1,9,14,23,20,18,13,24
7,1200114,0,4,5,5,16,15,15,15,13
8,1200118,0,0,0,0,0,0,0,0,0
9,1200119,0,4,9,7,18,17,17,17,17


In [68]:
inventory_df_pivot.to_csv('inventory.csv', index = False)

In [69]:
engine.stop()