In [1]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

# Creating the spark session

In [2]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Alkemy_task1")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

## Loading and exploration of prices_competitor dataset

In [3]:
df = spark.read.csv("all/prices_competitor.csv", header=True, inferSchema=True)
df.printSchema()

root
 |-- comp_date: timestamp (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- price: integer (nullable = true)



Checking number of rows and taking a glance on the dataframe

In [4]:
print("Numbers of rows: ", df.count())
df.show(10)

Numbers of rows:  8473036
+-------------------+---------+----------+-----+
|          comp_date|seller_id|product_id|price|
+-------------------+---------+----------+-----+
|2021-01-01 00:00:00|       23|    110064|27990|
|2021-01-01 00:00:00|       24|    110064|27990|
|2021-01-01 00:00:00|       26|    110064|27990|
|2021-01-01 00:00:00|       41|    110064|27490|
|2021-01-01 00:00:00|       48|    110064|27990|
|2021-01-01 00:00:00|      180|    110064|28990|
|2021-01-01 00:00:00|      407|    110064|28590|
|2021-01-01 00:00:00|       24|    151052|10499|
|2021-01-01 00:00:00|       26|    151052|10499|
|2021-01-01 00:00:00|       48|    151052|11666|
+-------------------+---------+----------+-----+
only showing top 10 rows



Some more explorations

Check the mean of price, the maximum and the minimum

In [5]:
df.describe().show()

df.agg({'price': 'mean'}).show()
df.agg({'price': 'max'}).show()
df.agg({'price': 'min'}).show()

+-------+------------------+------------------+------------------+
|summary|         seller_id|        product_id|             price|
+-------+------------------+------------------+------------------+
|  count|           8473036|           8473036|           8473036|
|   mean|111.75951606956468|140221.17376852877|18565.406793739578|
| stddev| 139.6180774294958|18635.810777512015|25574.826571561833|
|    min|                23|            100043|                 0|
|    max|               490|            184913|           1040290|
+-------+------------------+------------------+------------------+

+------------------+
|        avg(price)|
+------------------+
|18565.406793739578|
+------------------+

+----------+
|max(price)|
+----------+
|   1040290|
+----------+

+----------+
|min(price)|
+----------+
|         0|
+----------+



Taking a look to the average price per product, to familiarise with the data 

In [6]:
df.groupBy('product_id').agg({'price': 'mean'}).show()

+----------+------------------+
|product_id|        avg(price)|
+----------+------------------+
|    122128|10575.842040185471|
|    134205|2477.2226613965745|
|    139024|2858.0975166565718|
|    141533| 3406.744647105472|
|    134748| 36847.23794549266|
|    133577| 28574.76983503535|
|    146224|3917.4110169491523|
|    136631|1699.1234662576687|
|    144907|  6395.12962962963|
|    139747| 46087.40094339623|
|    139379|2600.2449592449593|
|    142545|15066.153846153846|
|    140541|29621.069143067303|
|    143737|23410.799708667153|
|    143032| 718.8254716981132|
|    176809| 4459.459218642906|
|    154033| 8001.885190725505|
|    154034| 10368.75064935065|
|    155042|3721.8970588235293|
|    153555| 64377.50558284949|
+----------+------------------+
only showing top 20 rows



Check if are there null values in the columns

In [7]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+---------+---------+----------+-----+
|comp_date|seller_id|product_id|price|
+---------+---------+----------+-----+
|        0|        0|         0|    0|
+---------+---------+----------+-----+



### Check when the price has been changed two time in a day

First, we look at the number of unique changes in a day, for a given seller and product

In [8]:
dropDisDF = df.dropDuplicates(["comp_date","seller_id","product_id"])
print("Number of unique changes of prices in a day: "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)

Number of unique changes of prices in a day: 7927677
+-------------------+---------+----------+-----+
|comp_date          |seller_id|product_id|price|
+-------------------+---------+----------+-----+
|2021-01-01 00:00:00|23       |100204    |649  |
|2021-01-01 00:00:00|23       |100205    |799  |
|2021-01-01 00:00:00|23       |100208    |899  |
|2021-01-01 00:00:00|23       |100212    |1299 |
|2021-01-01 00:00:00|23       |100405    |2199 |
|2021-01-01 00:00:00|23       |100406    |2699 |
|2021-01-01 00:00:00|23       |100421    |1599 |
|2021-01-01 00:00:00|23       |100594    |999  |
|2021-01-01 00:00:00|23       |100608    |299  |
|2021-01-01 00:00:00|23       |100612    |999  |
|2021-01-01 00:00:00|23       |100621    |1099 |
|2021-01-01 00:00:00|23       |100625    |990  |
|2021-01-01 00:00:00|23       |100629    |1099 |
|2021-01-01 00:00:00|23       |100645    |1699 |
|2021-01-01 00:00:00|23       |100651    |359  |
|2021-01-01 00:00:00|23       |100658    |259  |
|2021-01-01 00:0

Then we analyse which are the duplicated changes

In [9]:
df1=df.groupBy("comp_date","seller_id","product_id").count().filter("count > 1")
print(f'The number of time prices have been changed more than one time per day: {df1.count()}')
print('List of duplicated changes:')
df1.drop('count').show()

The number of time prices have been changed more than one time per day: 535362
List of duplicated changes:
+-------------------+---------+----------+
|          comp_date|seller_id|product_id|
+-------------------+---------+----------+
|2021-01-01 00:00:00|       24|    143732|
|2021-01-01 00:00:00|       24|    143736|
|2021-01-01 00:00:00|       24|    143737|
|2021-01-01 00:00:00|       24|    143738|
|2021-01-01 00:00:00|       24|    143739|
|2021-01-01 00:00:00|       24|    143809|
|2021-01-01 00:00:00|       24|    143832|
|2021-01-01 00:00:00|       24|    143836|
|2021-01-01 00:00:00|       24|    143865|
|2021-01-01 00:00:00|       24|    143914|
|2021-01-01 00:00:00|       24|    143928|
|2021-01-01 00:00:00|       24|    143941|
|2021-01-01 00:00:00|       24|    144054|
|2021-01-01 00:00:00|       24|    144113|
|2021-01-01 00:00:00|       24|    144119|
|2021-01-01 00:00:00|       24|    144194|
|2021-01-01 00:00:00|       24|    144603|
|2021-01-01 00:00:00|       24|  

We can, for istance, check that, for the first element of the above list, the price was effectively updated two time in a day

In [10]:
df.filter((df.comp_date=='2021-01-01') & (df.seller_id==24) & (df.product_id==143732)).show()

+-------------------+---------+----------+-----+
|          comp_date|seller_id|product_id|price|
+-------------------+---------+----------+-----+
|2021-01-01 00:00:00|       24|    143732|21327|
|2021-01-01 00:00:00|       24|    143732|23105|
+-------------------+---------+----------+-----+



Now we have dropDisDF that has no duplicated values

Checking the number of rows in which the price is 0

In [11]:
dropDisDF.select(F.count(F.when(dropDisDF['price'] == 0, 1))).show()

+---------------------------------------+
|count(CASE WHEN (price = 0) THEN 1 END)|
+---------------------------------------+
|                                      5|
+---------------------------------------+



Removing the rows with a 0 in price values

In [12]:
df_filtered = dropDisDF.filter(dropDisDF.price != 0)
print(f'Number of rows remaining after the manipulations {df_filtered.count()}')

Number of rows remaining after the manipulations 7927672


### Find min and max price for each product on all the year

Grouping by product to find the maximum and minimum price, the joining the price max and min to the price_competitors dataset

In [13]:
maxi = df_filtered.groupBy("product_id").agg(F.max("price"))
maxi = maxi.withColumnRenamed('product_id', 'product_id_max')
mini = df_filtered.groupBy("product_id").agg(F.min("price"))
mini = mini.withColumnRenamed('product_id', 'product_id_min')
joined1_df = df_filtered.join(maxi, df_filtered.product_id == maxi.product_id_max, "left").drop(maxi.product_id_max)
joined_df = joined1_df.join(mini, joined1_df.product_id == mini.product_id_min, "left").drop(mini.product_id_min)
joined_df.write.mode("overwrite").format("noop").save()
joined_df.show()

+-------------------+---------+----------+-----+----------+----------+
|          comp_date|seller_id|product_id|price|max(price)|min(price)|
+-------------------+---------+----------+-----+----------+----------+
|2021-01-01 00:00:00|       23|    100612|  999|       999|       613|
|2021-01-01 00:00:00|       23|    100625|  990|     79300|       793|
|2021-01-01 00:00:00|       23|    101767| 1499|      2590|      1392|
|2021-01-01 00:00:00|       23|    100802|  899|      1090|       629|
|2021-01-01 00:00:00|       23|    100212| 1299|      1832|      1283|
|2021-01-01 00:00:00|       23|    100645| 1699|      1699|      1199|
|2021-01-01 00:00:00|       23|    100421| 1599|      2090|      1313|
|2021-01-01 00:00:00|       23|    100629| 1099|      1099|       690|
|2021-01-01 00:00:00|       23|    100658|  259|       259|       133|
|2021-01-01 00:00:00|       23|    102187|  349|       349|       169|
|2021-01-01 00:00:00|       23|    100204|  649|       667|       467|
|2021-

### Checking if in the price_competitor dataset there are products that are not selled by the seller 24 

Loading the sales_data dataset

In [14]:
sales = spark.read.csv("all/sales_data.csv", header=True, inferSchema=True)
sales.printSchema()

root
 |-- sale_date: timestamp (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- sales_price_tax: double (nullable = true)
 |-- regular_price_tax: double (nullable = true)
 |-- sales_price: double (nullable = true)
 |-- regular_price: double (nullable = true)
 |-- purchase_price: double (nullable = true)



Getting the unique product_id values from sales_data dataset

In [15]:
un_prod = sales.select('product_id').distinct()
print("Number of sales_data rows: ", sales.count())
print("Number of unique products: ", un_prod.count())

Number of sales_data rows:  119125
Number of unique products:  7529


Getting the unique product_id values from price_competitor dataset

In [16]:
un_prod_comp = joined_df.select('product_id').distinct()
print('Number of unique products in price_competitor:', un_prod_comp.dropDuplicates(['product_id']).count())

Number of unique products in price_competitor: 6461


The number of product liste din sales_data is higher than the number in price_competitor, but we check anyway if there are products in price-competitor that do no belong even to sales_data

Taking the list of differrent products in sales data

In [17]:
prods = [i[0] for i in un_prod.select('product_id').collect()]

Checking if there are products in price_competitor that are not in sales_data, so are not selled by 24 

In [18]:
reduced = joined_df.where(joined_df['product_id'].isin(prods))
reduced.show()

+-------------------+---------+----------+-----+----------+----------+
|          comp_date|seller_id|product_id|price|max(price)|min(price)|
+-------------------+---------+----------+-----+----------+----------+
|2021-01-01 00:00:00|       23|    100612|  999|       999|       613|
|2021-01-01 00:00:00|       23|    100625|  990|     79300|       793|
|2021-01-01 00:00:00|       23|    101767| 1499|      2590|      1392|
|2021-01-01 00:00:00|       23|    100802|  899|      1090|       629|
|2021-01-01 00:00:00|       23|    100212| 1299|      1832|      1283|
|2021-01-01 00:00:00|       23|    100645| 1699|      1699|      1199|
|2021-01-01 00:00:00|       23|    100421| 1599|      2090|      1313|
|2021-01-01 00:00:00|       23|    100629| 1099|      1099|       690|
|2021-01-01 00:00:00|       23|    100658|  259|       259|       133|
|2021-01-01 00:00:00|       23|    102187|  349|       349|       169|
|2021-01-01 00:00:00|       23|    100204|  649|       667|       467|
|2021-

In [19]:
print("Numbers of rows: ", reduced.count())

Numbers of rows:  7927672


In [22]:
print('Number of unique products in price_competitor:', reduced.select('product_id').distinct().count())

Number of unique products in price_competitor: 6461


Since there are no changes both in the number of rows and in the number of unique products all the products selled in price_competitor are selled also by the seller 24 between the others

### product_catalog load and join with price_competitor

In [23]:
products = spark.read.csv("all/product_catalog.csv", header=True, inferSchema=True)
products = products.drop('coded_name')
products.show()

+----------+----------+----------+----------+-----------+
|product_id|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+----------+----------+----------+-----------+
|    109844|      1776|      1504|       732|        367|
|    119374|      1776|      1504|       732|        367|
|    131012|      1776|      1504|      1907|        367|
|    131031|      1776|      1504|      1907|        367|
|    131306|      1776|      1504|      1907|        213|
|    131060|      1776|      1504|      1907|        367|
|    131061|      1776|      1504|      1907|        367|
|    141568|      1776|      1504|      1907|        513|
|    141570|      1776|      1504|      1907|        513|
|    141556|      1776|      1504|      2183|        513|
|    141558|      1776|      1504|      2183|        513|
|    141774|      1776|      1504|      2183|        513|
|    141700|      1776|      1504|      1621|        513|
|    141735|      1776|      1504|      1907|        513|
|    141642|  

Join the products category dataset with the one created before: containing price min and max for each product

In [24]:
united = reduced.join(products, reduced.product_id == products.product_id, "left").drop(products.product_id)
united.show()

+-------------------+---------+----------+-----+----------+----------+----------+----------+----------+-----------+
|          comp_date|seller_id|product_id|price|max(price)|min(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+-------------------+---------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-01-01 00:00:00|       23|    100612|  999|       999|       613|      1375|      1760|      1348|        279|
|2021-01-01 00:00:00|       23|    100625|  990|     79300|       793|      1375|      1564|       489|        279|
|2021-01-01 00:00:00|       23|    101767| 1499|      2590|      1392|      1375|      1564|      2715|        279|
|2021-01-01 00:00:00|       23|    100802|  899|      1090|       629|      1375|      2519|      1338|        279|
|2021-01-01 00:00:00|       23|    100212| 1299|      1832|      1283|      1163|      1267|      4325|        445|
|2021-01-01 00:00:00|       23|    100645| 1699|      1699|      1199|  

### Adding seller name to the dataset

In [25]:
sellers = spark.read.csv("all/sellers_list.csv", header=True, inferSchema=True)
sellers.show()

+---------+-----------+
|seller_id|seller_name|
+---------+-----------+
|       48|          F|
|      180|          B|
|       24|          G|
|       41|          I|
|       26|          H|
|      490|          L|
|      188|          E|
|      407|          D|
|       23|          C|
+---------+-----------+



In [26]:
final = united.join(sellers, united.seller_id == sellers.seller_id, "left").drop(sellers.seller_id)
final = final.withColumn("date",to_date(col("comp_date"))).drop('comp_date')
final = final.select('date','seller_id','seller_name','product_id','price','min(price)','max(price)','coded_cat1','coded_cat2','coded_cat3','coded_brand')
final.show()

+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|      date|seller_id|seller_name|product_id|price|min(price)|max(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-01-01|       23|          C|    100612|  999|       613|       999|      1375|      1760|      1348|        279|
|2021-01-01|       23|          C|    100625|  990|       793|     79300|      1375|      1564|       489|        279|
|2021-01-01|       23|          C|    101767| 1499|      1392|      2590|      1375|      1564|      2715|        279|
|2021-01-01|       23|          C|    100802|  899|       629|      1090|      1375|      2519|      1338|        279|
|2021-01-01|       23|          C|    100212| 1299|      1283|      1832|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100645| 16

In [27]:
#final.coalesce(1).write.csv('joined', mode='overwrite', sep='\t', header=True)
#final = spark.read.csv('joined', sep = '\t',header=True, inferSchema=True)

In [28]:
final = final.withColumn('comp_date', to_date('date')).drop('date')
final = final.select('comp_date','seller_id','seller_name','product_id','price','min(price)','max(price)','coded_cat1','coded_cat2','coded_cat3','coded_brand')
final = final.orderBy('comp_date', 'seller_id', 'product_id')
final.show()

+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
| comp_date|seller_id|seller_name|product_id|price|min(price)|max(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-01-01|       23|          C|    100204|  649|       467|       667|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100205|  799|       723|       930|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100208|  899|       855|      1221|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100212| 1299|      1283|      1832|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100404| 1299|      1095|      1590|      1375|      1760|      1410|        279|
|2021-01-01|       23|          C|    100405| 21

In [29]:
final.count()

7927672

In [30]:
final.printSchema()

root
 |-- comp_date: date (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- min(price): integer (nullable = true)
 |-- max(price): integer (nullable = true)
 |-- coded_cat1: integer (nullable = true)
 |-- coded_cat2: integer (nullable = true)
 |-- coded_cat3: integer (nullable = true)
 |-- coded_brand: integer (nullable = true)



## Splitting the dataset in quarter and one split only for november 2021

In [31]:
first = final.filter(final.comp_date <= '2021-03-31')
second = final.filter((final.comp_date >= '2021-04-01') & (final.comp_date <= '2021-06-30'))
third = final.filter((final.comp_date >= '2021-07-01') & (final.comp_date <= '2021-09-30'))
fourth = final.filter(final.comp_date >= '2021-10-01')
november = final.filter((final.comp_date >= '2021-11-01') & (final.comp_date <= '2021-11-30'))

In [32]:
first.show(10)

+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
| comp_date|seller_id|seller_name|product_id|price|min(price)|max(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-01-01|       23|          C|    100204|  649|       467|       667|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100205|  799|       723|       930|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100208|  899|       855|      1221|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100212| 1299|      1283|      1832|      1163|      1267|      4325|        445|
|2021-01-01|       23|          C|    100404| 1299|      1095|      1590|      1375|      1760|      1410|        279|
|2021-01-01|       23|          C|    100405| 21

In [33]:
second.show()

+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
| comp_date|seller_id|seller_name|product_id|price|min(price)|max(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-04-01|       23|          C|    100204|  649|       467|       667|      1163|      1267|      4325|        445|
|2021-04-01|       23|          C|    100205|  799|       723|       930|      1163|      1267|      4325|        445|
|2021-04-01|       23|          C|    100212| 1299|      1283|      1832|      1163|      1267|      4325|        445|
|2021-04-01|       23|          C|    100404| 1299|      1095|      1590|      1375|      1760|      1410|        279|
|2021-04-01|       23|          C|    100405| 2199|      1866|      2590|      1375|      1760|      1410|        279|
|2021-04-01|       23|          C|    100406| 26

In [34]:
third.show()

+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
| comp_date|seller_id|seller_name|product_id|price|min(price)|max(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-07-01|       23|          C|    100204|  649|       467|       667|      1163|      1267|      4325|        445|
|2021-07-01|       23|          C|    100205|  799|       723|       930|      1163|      1267|      4325|        445|
|2021-07-01|       23|          C|    100212| 1299|      1283|      1832|      1163|      1267|      4325|        445|
|2021-07-01|       23|          C|    100404| 1299|      1095|      1590|      1375|      1760|      1410|        279|
|2021-07-01|       23|          C|    100405| 2199|      1866|      2590|      1375|      1760|      1410|        279|
|2021-07-01|       23|          C|    100406| 26

In [35]:
fourth.show()

+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
| comp_date|seller_id|seller_name|product_id|price|min(price)|max(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-10-01|       23|          C|    100204|  649|       467|       667|      1163|      1267|      4325|        445|
|2021-10-01|       23|          C|    100205|  799|       723|       930|      1163|      1267|      4325|        445|
|2021-10-01|       23|          C|    100212| 1299|      1283|      1832|      1163|      1267|      4325|        445|
|2021-10-01|       23|          C|    100404| 1299|      1095|      1590|      1375|      1760|      1410|        279|
|2021-10-01|       23|          C|    100405| 2199|      1866|      2590|      1375|      1760|      1410|        279|
|2021-10-01|       23|          C|    100406| 26

In [36]:
november.show()

+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
| comp_date|seller_id|seller_name|product_id|price|min(price)|max(price)|coded_cat1|coded_cat2|coded_cat3|coded_brand|
+----------+---------+-----------+----------+-----+----------+----------+----------+----------+----------+-----------+
|2021-11-01|       23|          C|    100409| 4290|      3644|      5190|      1375|      1760|      1410|        279|
|2021-11-01|       23|          C|    100594|  999|       785|      1190|      1375|      1760|      1410|        279|
|2021-11-01|       23|          C|    100633|  899|       790|       999|      1375|      1760|      1195|        279|
|2021-11-01|       23|          C|    100651|  359|       259|       399|      1375|      1760|      1348|        279|
|2021-11-01|       23|          C|    102301| 6999|      2662|      6999|      1617|      1181|      1044|        684|
|2021-11-01|       23|          C|    102892| 19

# Switch to python to make some price manipulation

In [37]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000001E1B854E380>>

In [38]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

Transform the spark DataFrame into pandas

In [39]:
pdfirst = first.toPandas()
pdsecond = second.toPandas()
pdthird = third.toPandas()
pdfourth = fourth.toPandas()
pdnov = november.toPandas()

In [40]:
#compute the price change for each products for each specific seller across time
pdfirst['price_change'] = pdfirst.groupby(["product_id",'seller_id'])['price'].diff()
pdsecond['price_change'] = pdsecond.groupby(["product_id",'seller_id'])['price'].diff()
pdthird['price_change'] = pdthird.groupby(["product_id",'seller_id'])['price'].diff()
pdfourth['price_change'] = pdfourth.groupby(["product_id",'seller_id'])['price'].diff()
pdnov['price_change'] = pdnov.groupby(["product_id",'seller_id'])['price'].diff()

In [41]:
#computing the log of price_change
pdfirst['price_change_log'] = np.sign(pdfirst['price_change'])*(np.log(np.abs(pdfirst['price_change'] + 1.01)))
pdsecond['price_change_log'] = np.sign(pdsecond['price_change'])*(np.log(np.abs(pdsecond['price_change'] + 1.01)))
pdthird['price_change_log'] = np.sign(pdthird['price_change'])*(np.log(np.abs(pdthird['price_change'] + 1.01)))
pdfourth['price_change_log'] = np.sign(pdfourth['price_change'])*(np.log(np.abs(pdfourth['price_change'] + 1.01)))
pdnov['price_change_log'] = np.sign(pdnov['price_change'])*(np.log(np.abs(pdnov['price_change'] + 1.01)))

In [42]:
#computing the price variance
pdfirst['price_change_var'] = pdfirst.groupby(["product_id",'seller_id'])['price_change'].transform('var')
pdsecond['price_change_var'] = pdsecond.groupby(["product_id",'seller_id'])['price_change'].transform('var')
pdthird['price_change_var'] = pdthird.groupby(["product_id",'seller_id'])['price_change'].transform('var')
pdfourth['price_change_var'] = pdfourth.groupby(["product_id",'seller_id'])['price_change'].transform('var')
pdnov['price_change_var'] = pdnov.groupby(["product_id",'seller_id'])['price_change'].transform('var')

In [43]:
# remove all the product of a specific seller with price_change variance = 0
pdfirst = pdfirst[pdfirst['price_change_var']!=0]
pdsecond = pdsecond[pdsecond['price_change_var']!=0]
pdthird = pdthird[pdthird['price_change_var']!=0]
pdfourth = pdfourth[pdfourth['price_change_var']!=0]
pdnov = pdnov[pdnov['price_change_var']!=0]

In [44]:
#First quarter

#remove the product not selled by 24
seller_list = pdfirst.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list = seller_list[seller_list['seller_list'].map(len) > 1]
not_24=seller_list[seller_list['seller_list'].map(lambda x: 24 not in x)]
pdfirst = pdfirst[~pdfirst['product_id'].isin(not_24['product_id'])]

#remove the product with one seller
seller_list_new = pdfirst.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list_new = seller_list_new[seller_list_new['seller_list'].map(len) > 1]
pdfirst = pdfirst[pdfirst['product_id'].isin(seller_list_new['product_id'])]

In [45]:
#Second quarter

#remove the product not selled by 24
seller_list = pdsecond.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list = seller_list[seller_list['seller_list'].map(len) > 1]
not_24=seller_list[seller_list['seller_list'].map(lambda x: 24 not in x)]
pdsecond = pdsecond[~pdsecond['product_id'].isin(not_24['product_id'])]

#remove the product with one seller
seller_list_new = pdsecond.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list_new = seller_list_new[seller_list_new['seller_list'].map(len) > 1]
pdsecond = pdsecond[pdsecond['product_id'].isin(seller_list_new['product_id'])]

In [46]:
#Third quarter

#remove the product not selled by 24
seller_list = pdthird.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list = seller_list[seller_list['seller_list'].map(len) > 1]
not_24=seller_list[seller_list['seller_list'].map(lambda x: 24 not in x)]
pdthird = pdthird[~pdthird['product_id'].isin(not_24['product_id'])]

#remove the product with one seller
seller_list_new = pdthird.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list_new = seller_list_new[seller_list_new['seller_list'].map(len) > 1]
pdthird = pdthird[pdthird['product_id'].isin(seller_list_new['product_id'])]

In [47]:
#Fourth quarter

#remove the product not selled by 24
seller_list = pdfourth.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list = seller_list[seller_list['seller_list'].map(len) > 1]
not_24=seller_list[seller_list['seller_list'].map(lambda x: 24 not in x)]
pdfourth = pdfourth[~pdfourth['product_id'].isin(not_24['product_id'])]

#remove the product with one seller
seller_list_new = pdfourth.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list_new = seller_list_new[seller_list_new['seller_list'].map(len) > 1]
pdfourth = pdfourth[pdfourth['product_id'].isin(seller_list_new['product_id'])]

In [48]:
#November

#remove the product not selled by 24
seller_list = pdnov.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list = seller_list[seller_list['seller_list'].map(len) > 1]
not_24=seller_list[seller_list['seller_list'].map(lambda x: 24 not in x)]
pdnov = pdnov[~pdnov['product_id'].isin(not_24['product_id'])]

#remove the product with one seller
seller_list_new = pdnov.groupby('product_id')['seller_id'].apply(set).reset_index(name='seller_list')
seller_list_new = seller_list_new[seller_list_new['seller_list'].map(len) > 1]
pdnov = pdnov[pdnov['product_id'].isin(seller_list_new['product_id'])]

In [49]:
# print the dataset for the analysis

pdfirst.to_csv('dataset/first_quarter.csv')
pdsecond.to_csv('dataset/second_quarter.csv')
pdthird.to_csv('dataset/third_quarter.csv')
pdfourth.to_csv('dataset/fourth_quarter.csv')
pdnov.to_csv('dataset/november.csv')