In [39]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.master('local').config('spark.driver.memory', '16g').appName(
    'Read_file').getOrCreate()



In [3]:
! echo $JAVA_HOME

/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home


In [4]:
parquetFile = spark.read.parquet(
    '/Users/gabriele.sabato/Downloads/part-00000-ea5756f7-5606-4a48-b72c-5ffc455f06fc-c000.snappy.parquet')

In [5]:


parquetFile.createOrReplaceTempView('tmp_table')


In [6]:
parquetFile.show(10)

+----------+------------------+-------------+-------------+-------------+-----------------+-------------+-------------------+-------+----------+----------+---------------+------------+-------------+
|      date|         item_code|   item_price|    GOI_final|    GOV_final|        NOI_final|    NOV_final|voucher_value_final|N_sales|   PC1_fin|   PC2_fin|total_costs_fin|Margin_final|delivery_days|
+----------+------------------+-------------+-------------+-------------+-----------------+-------------+-------------------+-------+----------+----------+---------------+------------+-------------+
|2020-01-01|000000001000000270|  36.99000000|  36.99000000|  36.99000000|  31.084033613445|  31.08403400|                  0|      0| 13.840000|  6.450000|      20.290000|    0.000000|            4|
|2020-01-01|000000001000009133|1899.99000000|1899.99000000|1899.99000000|1596.630252100840|1596.63025200|                  0|      0|592.000000| 90.270000|     682.270000|    0.000000|           60|
|2020

In [7]:
spark.sql("""SELECT COUNT(distinct  item_code)
from tmp_table""").show()

+-------------------------+
|count(DISTINCT item_code)|
+-------------------------+
|                    63136|
+-------------------------+



In [8]:
#item_sold
spark.sql("""WITH sales as (SELECT item_code, SUM(n_sales) as total_sale
                                from tmp_table
                                GROUP BY item_code
                                )
                                SELECT COUNT(DISTINCT item_code)
                                from sales
                                where total_sale > 0
                """).show()

+-------------------------+
|count(DISTINCT item_code)|
+-------------------------+
|                    61824|
+-------------------------+



In [9]:
spark.sql("""WITH sales as (SELECT item_code, SUM(n_sales) as total_sale
                                from tmp_table
                                GROUP BY item_code
                                )
                                SELECT COUNT(DISTINCT item_code)
                                from sales
                                where total_sale = 0
                """).show()

+-------------------------+
|count(DISTINCT item_code)|
+-------------------------+
|                     1312|
+-------------------------+



In [10]:
new_file = parquetFile.select('item_code').where(col('N_sales') > 0).distinct()


In [11]:
new_file.select(countDistinct("item_code")).show()

+-------------------------+
|count(DISTINCT item_code)|
+-------------------------+
|                    61824|
+-------------------------+



In [12]:
no_null = parquetFile.na.drop()

In [13]:
no_null.select(countDistinct("item_code")).show()

+-------------------------+
|count(DISTINCT item_code)|
+-------------------------+
|                    61892|
+-------------------------+



In [14]:
no_null.createOrReplaceTempView('no_null')

In [15]:
spark.sql("""WITH sales as (SELECT item_code, SUM(n_sales) as total_sale
                                from no_null
                                GROUP BY item_code
                                )
                                SELECT COUNT(DISTINCT item_code)
                                from sales
                                where total_sale = 0
                """).show()

+-------------------------+
|count(DISTINCT item_code)|
+-------------------------+
|                       68|
+-------------------------+



In [16]:
lost_items = spark.sql("""SELECT item_code
                           FROM no_null
                           GROUP BY item_code
                           HAVING SUM(n_sales) = 0""")

In [17]:
lost_items.show()

+------------------+
|         item_code|
+------------------+
|000000001000019797|
|000000001000168663|
|000000001000016155|
|000000001000113176|
|000000001000123348|
|000000001000101821|
|000000001000200213|
|000000001000009714|
|000000001000081704|
|000000001000057418|
|000000001000129954|
|000000001000205046|
|000000001000174856|
|000000001000203700|
|000000001000203665|
|000000001000174208|
|000000001000190727|
|000000001000014232|
|000000001000036778|
|000000001000189003|
+------------------+
only showing top 20 rows



In [18]:
lost_items_list = [(row['item_code']) for row in lost_items.collect()]

In [19]:
lost_items_df = parquetFile.selectExpr("*").where(col("item_code").isin(lost_items_list)).orderBy(
    col('item_code').asc(), col('date'))

In [20]:
lost_items_df.show(500)

+----------+------------------+-------------+-------------+-------------+-----------------+-------------+-------------------+-------+--------+--------+---------------+------------+-------------+
|      date|         item_code|   item_price|    GOI_final|    GOV_final|        NOI_final|    NOV_final|voucher_value_final|N_sales| PC1_fin| PC2_fin|total_costs_fin|Margin_final|delivery_days|
+----------+------------------+-------------+-------------+-------------+-----------------+-------------+-------------------+-------+--------+--------+---------------+------------+-------------+
|2020-01-01|000000001000001358| 218.99000000| 218.99000000| 218.99000000| 184.025210084034| 184.02521000|                  0|      0|0.000000|0.000000|       0.000000|    0.000000|            3|
|2020-01-02|000000001000001358| 218.99000000| 218.99000000| 218.99000000| 184.025210084034| 184.02521000|                  0|      0|0.000000|0.000000|       0.000000|    0.000000|            3|
|2020-01-03|0000000010000

In [21]:
lost_items_df.selectExpr("*").where(col("N_sales") != 0).show()


+----+---------+----------+---------+---------+---------+---------+-------------------+-------+-------+-------+---------------+------------+-------------+
|date|item_code|item_price|GOI_final|GOV_final|NOI_final|NOV_final|voucher_value_final|N_sales|PC1_fin|PC2_fin|total_costs_fin|Margin_final|delivery_days|
+----+---------+----------+---------+---------+---------+---------+-------------------+-------+-------+-------+---------------+------------+-------------+
+----+---------+----------+---------+---------+---------+---------+-------------------+-------+-------+-------+---------------+------------+-------------+



In [22]:
parquetFile = parquetFile.na.drop()

In [23]:
df_null = (parquetFile
           .groupBy(col('item_code')).agg(sum('N_sales').alias('total_sales'))
           .where(col('total_sales') == 0)
           .select('item_code').distinct())




In [24]:
df_null_list = [(row['item_code']) for row in df_null.collect()]

In [25]:
print(df_null_list)

['000000001000019797', '000000001000168663', '000000001000016155', '000000001000113176', '000000001000123348', '000000001000101821', '000000001000200213', '000000001000009714', '000000001000081704', '000000001000057418', '000000001000129954', '000000001000205046', '000000001000174856', '000000001000203700', '000000001000203665', '000000001000174208', '000000001000190727', '000000001000014232', '000000001000036778', '000000001000189003', '000000001000141893', '000000001000203707', '000000001000207665', '000000001000190085', '000000001000138937', '000000001000035717', '000000001000181387', '000000001000189452', '000000001000075615', '000000001000174954', '000000001000170114', '000000001000200703', '000000001000103194', '000000001000030451', '000000001000196528', '000000001000125175', '000000001000001358', '000000001000192548', '000000001000136098', '000000001000182063', '000000001000059479', '000000001000186526', '000000001000017542', '000000001000094725', '000000001000173725', '00000000

In [26]:
fin_df = (parquetFile.selectExpr('*')
          .where(~col('item_code').isin(df_null_list)))


In [27]:
fin_df.select((countDistinct(col('item_code')))).show()

+-------------------------+
|count(DISTINCT item_code)|
+-------------------------+
|                    61824|
+-------------------------+



In [28]:
top_seller_items = (
    fin_df
        .groupBy(col('item_code')).agg(sum(col('N_sales')).alias('total_sales'))
        .orderBy(col('total_sales').desc())
        .selectExpr('item_code')
        .limit(5000)
)

In [29]:
top_seller_items_list = [(row['item_code']) for row in top_seller_items.collect()]


In [30]:
top_seller_df = (
    fin_df.selectExpr('*')
        .where(col('item_code').isin(top_seller_items_list))
)

In [31]:
top_seller_df.show()

+----------+------------------+------------+------------+------------+----------------+------------+-------------------+-------+----------+---------+---------------+------------+-------------+
|      date|         item_code|  item_price|   GOI_final|   GOV_final|       NOI_final|   NOV_final|voucher_value_final|N_sales|   PC1_fin|  PC2_fin|total_costs_fin|Margin_final|delivery_days|
+----------+------------------+------------+------------+------------+----------------+------------+-------------------+-------+----------+---------+---------------+------------+-------------+
|2020-01-01|000000001000012530| 99.99000000| 99.99000000| 99.99000000| 84.030000000000| 84.03000000|                  0|      1| 38.450000| 8.900000|      47.350000|   36.680000|            3|
|2020-01-01|000000001000012530| 99.99000000| 99.99000000| 99.99000000| 84.030000000000| 84.03000000|                  0|      1| 38.450000| 8.900000|      47.350000|   36.680000|            3|
|2020-01-01|000000001000021922|129.

In [32]:
agg_df = (
    top_seller_df
        .groupBy('date', 'item_code')
        .agg(sum('N_sales').alias('N_sales_tot'),
             avg('item_price').alias('item_Price'),
             avg('GOI_final').alias('GOI'),
             avg('GOV_final').alias('GOV'),
             avg('NOI_final').alias('NOI'),
             avg('NOV_final').alias('NOV'),
             avg('voucher_value_final').alias('voucher_value'),
             avg('total_costs_fin').alias('tot_costs'),
             sum('Margin_final').alias('Margin'),
             avg('delivery_days').alias('del_days')
             )
        .orderBy(col('date').asc())
        .select('date',
                'item_code',
                'N_sales_tot',
                'item_Price',
                'GOI',
                'GOV',
                'NOI',
                'NOV',
                'voucher_value',
                'tot_costs',
                'Margin',
                'del_days')
)

In [33]:
agg_df.show()


+----------+------------------+-----------+----------------+----------------+----------------+--------------------+----------------+-------------+--------------+----------+--------+
|      date|         item_code|N_sales_tot|      item_Price|             GOI|             GOV|                 NOI|             NOV|voucher_value|     tot_costs|    Margin|del_days|
+----------+------------------+-----------+----------------+----------------+----------------+--------------------+----------------+-------------+--------------+----------+--------+
|2020-01-01|000000001000151609|          0| 42.990000000000| 42.990000000000| 42.990000000000| 36.1260504201680000| 36.126050000000|       0.0000| 23.0900000000|  0.000000|     5.0|
|2020-01-01|000000001000160383|          0|129.990000000000|129.990000000000|129.990000000000|109.2352941176470000|109.235294000000|       0.0000| 66.0800000000|  0.000000|     8.0|
|2020-01-01|000000001000015884|          3| 89.990000000000| 89.990000000000| 89.990000000

In [45]:
one_item = fin_df.selectExpr('*').where(F.col('item_code') == '000000001000015884')

In [46]:
one_item.show()

+----------+------------------+-----------+-----------+-----------+---------------+-----------+-------------------+-------+---------+---------+---------------+------------+-------------+
|      date|         item_code| item_price|  GOI_final|  GOV_final|      NOI_final|  NOV_final|voucher_value_final|N_sales|  PC1_fin|  PC2_fin|total_costs_fin|Margin_final|delivery_days|
+----------+------------------+-----------+-----------+-----------+---------------+-----------+-------------------+-------+---------+---------+---------------+------------+-------------+
|2020-01-01|000000001000015884|89.99000000|89.99000000|89.99000000|75.620000000000|75.62000000|                  0|      1|37.280000|12.360000|      49.640000|   25.980000|           11|
|2020-01-01|000000001000015884|89.99000000|89.99000000|89.99000000|75.620000000000|75.62000000|                  0|      1|37.280000|12.360000|      49.640000|   25.980000|           11|
|2020-01-01|000000001000015884|89.99000000|89.99000000|89.9900000

In [47]:
output_path = '/tmp/price-elasticity-feature'

In [48]:
N_items = 5000

In [49]:
one_item

DataFrame[date: date, item_code: string, item_price: decimal(18,8), GOI_final: decimal(20,8), GOV_final: decimal(38,8), NOI_final: decimal(24,12), NOV_final: decimal(38,8), voucher_value_final: decimal(38,0), N_sales: int, PC1_fin: decimal(21,6), PC2_fin: decimal(21,6), total_costs_fin: decimal(22,6), Margin_final: decimal(21,6), delivery_days: int]

In [50]:
one_item.dtypes

[('date', 'date'),
 ('item_code', 'string'),
 ('item_price', 'decimal(18,8)'),
 ('GOI_final', 'decimal(20,8)'),
 ('GOV_final', 'decimal(38,8)'),
 ('NOI_final', 'decimal(24,12)'),
 ('NOV_final', 'decimal(38,8)'),
 ('voucher_value_final', 'decimal(38,0)'),
 ('N_sales', 'int'),
 ('PC1_fin', 'decimal(21,6)'),
 ('PC2_fin', 'decimal(21,6)'),
 ('total_costs_fin', 'decimal(22,6)'),
 ('Margin_final', 'decimal(21,6)'),
 ('delivery_days', 'int')]

In [51]:
agg_df = (
    one_item
        .groupBy('date', 'item_code')
        .agg(F.sum('N_sales').alias('N_sales_tot'),
             F.avg('item_price').alias('item_Price'),
             F.avg('GOI_final').alias('GOI'),
             F.avg('GOV_final').alias('GOV'),
             F.avg('NOI_final').alias('NOI'),
             F.avg('NOV_final').alias('NOV'),
             F.avg('voucher_value_final').alias('voucher_value'),
             F.avg('total_costs_fin').alias('tot_costs'),
             F.sum('Margin_final').alias('Margin'),
             F.avg('delivery_days').alias('del_days')
             )
        .orderBy(F.col('date').asc())
        .select('date',
                'item_code',
                'N_sales_tot',
                'item_Price',
                'GOI',
                'GOV',
                'NOI',
                'NOV',
                'voucher_value',
                'tot_costs',
                'Margin',
                'del_days')
)


In [52]:
a_df = (agg_df.withColumn('delivery_weeks', F.ceil(F.col('del_days') / 5.))
        .select('date',
                'item_code',
                'N_sales_tot',
                'item_Price',
                'GOI',
                'GOV',
                'NOI',
                'NOV',
                'voucher_value',
                'tot_costs',
                'Margin',
                'delivery_weeks'))


In [54]:
a_df.dtypes

[('date', 'date'),
 ('item_code', 'string'),
 ('N_sales_tot', 'bigint'),
 ('item_Price', 'decimal(22,12)'),
 ('GOI', 'decimal(24,12)'),
 ('GOV', 'decimal(38,12)'),
 ('NOI', 'decimal(28,16)'),
 ('NOV', 'decimal(38,12)'),
 ('voucher_value', 'decimal(38,4)'),
 ('tot_costs', 'decimal(26,10)'),
 ('Margin', 'decimal(31,6)'),
 ('delivery_weeks', 'bigint')]

In [55]:
from pyspark.sql import DataFrame, Window

In [75]:
one_item_tbl = (
    a_df.withColumn("ranking_col",
                    F.when(
                        (F.lag(F.col('delivery_weeks'), 1)
                         .over(Window.partitionBy(F.col('item_code'))
                               .orderBy(F.col('date').asc())
                               )
                         == F.col('delivery_weeks'))
                        &
                        (F.lag(F.col('GOV'), 1)
                         .over(Window.partitionBy(F.col('item_code'))
                               .orderBy(F.col('date').asc())
                               )
                         == F.col('GOV')), None)
                    .otherwise(F.rank()
                               .over(Window.partitionBy(F.col('item_code'))
                                     .orderBy(F.col('date').asc())
                                     )
                               )
                    )
)

In [65]:

one_item_tbl.show()

+----------+------------------+-----------+---------------+---------------+---------------+-------------------+---------------+-------------+-------------+----------+--------------+-----------+
|      date|         item_code|N_sales_tot|     item_Price|            GOI|            GOV|                NOI|            NOV|voucher_value|    tot_costs|    Margin|delivery_weeks|ranking_col|
+----------+------------------+-----------+---------------+---------------+---------------+-------------------+---------------+-------------+-------------+----------+--------------+-----------+
|2020-01-01|000000001000015884|          3|89.990000000000|89.990000000000|89.990000000000|75.6200000000000000|75.620000000000|       0.0000|49.6400000000| 77.940000|             3|          1|
|2020-01-02|000000001000015884|          5|89.990000000000|89.990000000000|89.990000000000|75.6200000000000000|75.620000000000|       0.0000|49.6400000000|129.900000|             2|          2|
|2020-01-03|000000001000015884

In [86]:
one_item_tbl2 = (
    one_item_tbl.withColumn("ranks",
                            F.when(F.col('ranking_col').isNull(),
                                   F.last(F.col('ranking_col'), True)
                                   .over(Window.partitionBy(F.col('item_code')).orderBy(F.col('date').asc())
                                         .rowsBetween(Window.unboundedPreceding, Window.currentRow)
                                         )
                                   ).otherwise(F.col('ranking_col'))
                            )
)

In [89]:
one_item_tbl2.dtypes

[('date', 'date'),
 ('item_code', 'string'),
 ('N_sales_tot', 'bigint'),
 ('item_Price', 'decimal(22,12)'),
 ('GOI', 'decimal(24,12)'),
 ('GOV', 'decimal(38,12)'),
 ('NOI', 'decimal(28,16)'),
 ('NOV', 'decimal(38,12)'),
 ('voucher_value', 'decimal(38,4)'),
 ('tot_costs', 'decimal(26,10)'),
 ('Margin', 'decimal(31,6)'),
 ('delivery_weeks', 'bigint'),
 ('ranking_col', 'int'),
 ('ranks', 'int')]

In [99]:
grouped_df = (
    one_item_tbl2
        .groupBy(F.col('item_code'), F.col('delivery_weeks'), F.col('GOV'), F.col('ranks'))
        .agg(F.min(F.col('date')).alias('min_date'),
             F.max(F.col('date')).alias('max_date'),
             F.avg(F.col('N_sales_tot')).alias('avg_sales'),
             F.avg(F.col('Margin')).alias('avg_Margin'),
             F.avg(F.col('tot_costs')).alias('avg_tot_costs'),
             F.avg(F.col('voucher_value')).alias('avg_voucher_value'),
             )
        .withColumn('log_price', F.log(F.col('GOV')))
        .orderBy(F.col('item_code'), F.col('min_date').asc())
        .selectExpr('min_date',
                    'max_date',
                    'item_code',
                    'GOV',
                    'log_price',
                    'delivery_weeks',
                    'avg_sales',
                    'avg_Margin',
                    'avg_tot_costs',
                    'avg_voucher_value',
                    'CAST (datediff(max_date, min_date ) + 1 as int) AS bin'
                    )
)

In [100]:
grouped_df.show()

+----------+----------+------------------+---------------+------------------+--------------+------------------+--------------+-----------------+-----------------+---+
|  min_date|  max_date|         item_code|            GOV|         log_price|delivery_weeks|         avg_sales|    avg_Margin|    avg_tot_costs|avg_voucher_value|bin|
+----------+----------+------------------+---------------+------------------+--------------+------------------+--------------+-----------------+-----------------+---+
|2020-01-01|2020-01-01|000000001000015884|89.990000000000| 4.499698553045857|             3|               3.0| 77.9400000000|49.64000000000000|             0E-8|  1|
|2020-01-02|2020-01-02|000000001000015884|89.990000000000| 4.499698553045857|             2|               5.0|129.9000000000|49.64000000000000|             0E-8|  1|
|2020-01-03|2020-01-03|000000001000015884|85.490000000000| 4.448399410038142|             2|               3.0| 66.8100000000|49.57000000000000|       4.33330000|  1