### Problem Statement:
Design a scalable system capability to predict customers purchase behavior and based on purchase history. Analyze customer purchase history and predict new incoming order from customers in certain area so that product can be made available in advance at warehouse to fulfill customer demand.
Implement following functionality:
- Perform research how required data can be collected from open sources and implement data loading pipeline.
- Once data has been collected, clean it so that it can be analyzed and new model can be created to predict product   requirement in warehouse based on possible incoming requirement from customer.

#### Entire pipeline should be designed using Big data tools.


### Dataset Description

File Descriptions and Data Field Information

#### train.csv
* The training data, comprising time series of features store_nbr, family, and onpromotion as well as the target sales.
* store_nbr identifies the store at which the products are sold.
* family identifies the type of product sold.
* sales gives the total sales for a product family at a particular store at a given date. Fractional values are possible since products can be sold in fractional units (1.5 kg of cheese, for instance, as opposed to 1 bag of chips).
* onpromotion gives the total number of items in a product family that were being promoted at a store at a given date.

#### test.csv
* The test data, having the same features as the training data. You will predict the target sales for the dates in this file.
* The dates in the test data are for the 15 days after the last date in the training data.

#### stores.csv
* Store metadata, including city, state, type, and cluster.
* cluster is a grouping of similar stores.

#### oil.csv
* Daily oil price. Includes values during both the train and test data timeframes. (Ecuador is an oil-dependent country and it's economical health is highly vulnerable to shocks in oil prices.)

#### holidays_events.csv
* Holidays and Events, with metadata
* NOTE: Pay special attention to the transferred column. A holiday that is transferred officially falls on that calendar day, but was moved to another date by the government. A transferred day is more like a normal day than a holiday. To find the day that it was actually celebrated, look for the corresponding row where type is Transfer. For example, the holiday Independencia de Guayaquil was transferred from 2012-10-09 to 2012-10-12, which means it was celebrated on 2012-10-12. Days that are type Bridge are extra days that are added to a holiday (e.g., to extend the break across a long weekend). These are frequently made up by the type Work Day which is a day not normally scheduled for work (e.g., Saturday) that is meant to payback the Bridge.
* Additional holidays are days added a regular calendar holiday, for example, as typically happens around Christmas (making Christmas Eve a holiday).

#### Additional Notes
* Wages in the public sector are paid every two weeks on the 15 th and on the last day of the month. Supermarket sales could be affected by this.
* A magnitude 7.8 earthquake struck Ecuador on April 16, 2016. People rallied in relief efforts donating water and other first need products which greatly affected supermarket sales for several weeks after the earthquake.

#### Importing Libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
import datetime
import time

#### Setting up spark section

In [2]:
spark = SparkSession.builder.appName("CustomerOrder").getOrCreate()

In [3]:
spark

Reading all the csv files 

In [4]:
df_holiday_event = spark.read.csv("holidays_events.csv", header=True, inferSchema=True)
df_oil = spark.read.csv("oil.csv", header=True, inferSchema=True)
df_stores = spark.read.csv("stores.csv", header=True, inferSchema=True)
df_trans = spark.read.csv("transactions.csv", header=True, inferSchema=True)
df_train = spark.read.csv("train.csv", header=True, inferSchema=True)


In [5]:
df_holiday_event.show(5)

+-------------------+-------+--------+-----------+--------------------+-----------+
|               date|   type|  locale|locale_name|         description|transferred|
+-------------------+-------+--------+-----------+--------------------+-----------+
|2012-03-02 00:00:00|Holiday|   Local|      Manta|  Fundacion de Manta|      false|
|2012-04-01 00:00:00|Holiday|Regional|   Cotopaxi|Provincializacion...|      false|
|2012-04-12 00:00:00|Holiday|   Local|     Cuenca| Fundacion de Cuenca|      false|
|2012-04-14 00:00:00|Holiday|   Local|   Libertad|Cantonizacion de ...|      false|
|2012-04-21 00:00:00|Holiday|   Local|   Riobamba|Cantonizacion de ...|      false|
+-------------------+-------+--------+-----------+--------------------+-----------+
only showing top 5 rows



In [6]:
df_oil.show(5)

+-------------------+----------+
|               date|dcoilwtico|
+-------------------+----------+
|2013-01-01 00:00:00|      null|
|2013-01-02 00:00:00|     93.14|
|2013-01-03 00:00:00|     92.97|
|2013-01-04 00:00:00|     93.12|
|2013-01-07 00:00:00|      93.2|
+-------------------+----------+
only showing top 5 rows



In [7]:
df_stores.show(5)

+---------+-------------+--------------------+-----+-------+
|store_nbr|         city|               state|type1|cluster|
+---------+-------------+--------------------+-----+-------+
|        1|        Quito|           Pichincha|    D|     13|
|        2|        Quito|           Pichincha|    D|     13|
|        3|        Quito|           Pichincha|    D|      8|
|        4|        Quito|           Pichincha|    D|      9|
|        5|Santo Domingo|Santo Domingo de ...|    D|      4|
+---------+-------------+--------------------+-----+-------+
only showing top 5 rows



In [8]:
df_trans.show(5)

+-------------------+---------+------------+
|               date|store_nbr|transactions|
+-------------------+---------+------------+
|2013-01-01 00:00:00|       25|         770|
|2013-01-02 00:00:00|        1|        2111|
|2013-01-02 00:00:00|        2|        2358|
|2013-01-02 00:00:00|        3|        3487|
|2013-01-02 00:00:00|        4|        1922|
+-------------------+---------+------------+
only showing top 5 rows



In [9]:
df_train.show(5)

+---+-------------------+---------+----------+-----+-----------+
| id|               date|store_nbr|    family|sales|onpromotion|
+---+-------------------+---------+----------+-----+-----------+
|  0|2013-01-01 00:00:00|        1|AUTOMOTIVE|  0.0|          0|
|  1|2013-01-01 00:00:00|        1| BABY CARE|  0.0|          0|
|  2|2013-01-01 00:00:00|        1|    BEAUTY|  0.0|          0|
|  3|2013-01-01 00:00:00|        1| BEVERAGES|  0.0|          0|
|  4|2013-01-01 00:00:00|        1|     BOOKS|  0.0|          0|
+---+-------------------+---------+----------+-----+-----------+
only showing top 5 rows



#### The train.csv file is the file which contains the meta data about the stores sales
* I would be adding more Granilurity to the train.csv file by joining other files to get more in-depth information 

Here I have used the join funtion. To perform join operation on the date column since it as a primary key to our train table file and acting as a foreign key for other table files

In [10]:
df = df_train.join(df_holiday_event, ["date"])
df = df.join(df_oil, ['date'])
df = df.join(df_stores, ['store_nbr'])
df = df.join(df_trans, ['date', 'store_nbr'])
df.show(5)

+-------------------+---------+---+----------+-----+-----------+-------+--------+-----------+------------------+-----------+----------+-------+-----------+-----+-------+------------+
|               date|store_nbr| id|    family|sales|onpromotion|   type|  locale|locale_name|       description|transferred|dcoilwtico|   city|      state|type1|cluster|transactions|
+-------------------+---------+---+----------+-----+-----------+-------+--------+-----------+------------------+-----------+----------+-------+-----------+-----+-------+------------+
|2013-01-01 00:00:00|       25|561|AUTOMOTIVE|  0.0|          0|Holiday|National|    Ecuador|Primer dia del ano|      false|      null|Salinas|Santa Elena|    D|      1|         770|
|2013-01-01 00:00:00|       25|562| BABY CARE|  0.0|          0|Holiday|National|    Ecuador|Primer dia del ano|      false|      null|Salinas|Santa Elena|    D|      1|         770|
|2013-01-01 00:00:00|       25|563|    BEAUTY|  2.0|          0|Holiday|National|    

In [11]:
df = df.withColumnRenamed('type', 'holiday_type')
df = df.withColumnRenamed('type1', 'store_type')
df = df.withColumnRenamed('family', 'product_category')


In [12]:
df.show(2)

+-------------------+---------+---+----------------+-----+-----------+------------+--------+-----------+------------------+-----------+----------+-------+-----------+----------+-------+------------+
|               date|store_nbr| id|product_category|sales|onpromotion|holiday_type|  locale|locale_name|       description|transferred|dcoilwtico|   city|      state|store_type|cluster|transactions|
+-------------------+---------+---+----------------+-----+-----------+------------+--------+-----------+------------------+-----------+----------+-------+-----------+----------+-------+------------+
|2013-01-01 00:00:00|       25|561|      AUTOMOTIVE|  0.0|          0|     Holiday|National|    Ecuador|Primer dia del ano|      false|      null|Salinas|Santa Elena|         D|      1|         770|
|2013-01-01 00:00:00|       25|562|       BABY CARE|  0.0|          0|     Holiday|National|    Ecuador|Primer dia del ano|      false|      null|Salinas|Santa Elena|         D|      1|         770|
+----

#### Removing unrequired columns.

In [13]:
df1 = df.select(['id','store_nbr','date','product_category','onpromotion','holiday_type','locale',
'transferred','dcoilwtico','store_type','cluster','transactions','sales'])

In [14]:
df1.show(5)

+---+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
| id|store_nbr|               date|product_category|onpromotion|holiday_type|  locale|transferred|dcoilwtico|store_type|cluster|transactions|sales|
+---+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
|561|       25|2013-01-01 00:00:00|      AUTOMOTIVE|          0|     Holiday|National|      false|      null|         D|      1|         770|  0.0|
|562|       25|2013-01-01 00:00:00|       BABY CARE|          0|     Holiday|National|      false|      null|         D|      1|         770|  0.0|
|563|       25|2013-01-01 00:00:00|          BEAUTY|          0|     Holiday|National|      false|      null|         D|      1|         770|  2.0|
|564|       25|2013-01-01 00:00:00|       BEVERAGES|          0|     Holiday|National|      false|      null|   

#### Chekcing data types of all the columns

In [15]:
df1.dtypes

[('id', 'int'),
 ('store_nbr', 'int'),
 ('date', 'timestamp'),
 ('product_category', 'string'),
 ('onpromotion', 'int'),
 ('holiday_type', 'string'),
 ('locale', 'string'),
 ('transferred', 'boolean'),
 ('dcoilwtico', 'double'),
 ('store_type', 'string'),
 ('cluster', 'int'),
 ('transactions', 'int'),
 ('sales', 'double')]

#### Checking total number of rows in your Data Frame

In [16]:
df1.count()

322047

#### Creating new index for DataFrame
* After performaing join operation the original indexing is disturbed

In [17]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

df1 = df1.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
df1.show(3)

+---+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+-----+
| id|store_nbr|               date|product_category|onpromotion|holiday_type|  locale|transferred|dcoilwtico|store_type|cluster|transactions|sales|index|
+---+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+-----+
|561|       25|2013-01-01 00:00:00|      AUTOMOTIVE|          0|     Holiday|National|      false|      null|         D|      1|         770|  0.0|    0|
|562|       25|2013-01-01 00:00:00|       BABY CARE|          0|     Holiday|National|      false|      null|         D|      1|         770|  0.0|    1|
|563|       25|2013-01-01 00:00:00|          BEAUTY|          0|     Holiday|National|      false|      null|         D|      1|         770|  2.0|    2|
+---+---------+-------------------+----------------+-----------+------------

In [18]:
df2 = df1.select(['index','store_nbr','date','product_category','onpromotion','holiday_type','locale',
'transferred','dcoilwtico','store_type','cluster','transactions','sales'])
df2.show(2)

+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
|index|store_nbr|               date|product_category|onpromotion|holiday_type|  locale|transferred|dcoilwtico|store_type|cluster|transactions|sales|
+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
|    0|       25|2013-01-01 00:00:00|      AUTOMOTIVE|          0|     Holiday|National|      false|      null|         D|      1|         770|  0.0|
|    1|       25|2013-01-01 00:00:00|       BABY CARE|          0|     Holiday|National|      false|      null|         D|      1|         770|  0.0|
+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
only showing top 2 rows



### CHECKING FOR NULL VALUES in all the columns

In [19]:
Dict_Null = {col:df2.filter(df2[col].isNull()).count() for col in df2.columns}
Dict_Null

{'index': 0,
 'store_nbr': 0,
 'date': 0,
 'product_category': 0,
 'onpromotion': 0,
 'holiday_type': 0,
 'locale': 0,
 'transferred': 0,
 'dcoilwtico': 22044,
 'store_type': 0,
 'cluster': 0,
 'transactions': 0,
 'sales': 0}

We have 22044 null values present in column 'dcoilwtico'
- As seen above the column dcoilwtico has a data dtype " double " so we can use mean, median, mode to handle the missing values

#### Checking for how values are spread accross the 'dcoilwtico' column

In [20]:
df2.select('dcoilwtico').distinct().collect()

[Row(dcoilwtico=94.74),
 Row(dcoilwtico=105.41),
 Row(dcoilwtico=97.1),
 Row(dcoilwtico=106.61),
 Row(dcoilwtico=91.23),
 Row(dcoilwtico=93.84),
 Row(dcoilwtico=99.18),
 Row(dcoilwtico=98.87),
 Row(dcoilwtico=94.25),
 Row(dcoilwtico=98.17),
 Row(dcoilwtico=None),
 Row(dcoilwtico=103.07),
 Row(dcoilwtico=97.14),
 Row(dcoilwtico=93.12),
 Row(dcoilwtico=105.34),
 Row(dcoilwtico=107.43),
 Row(dcoilwtico=101.92),
 Row(dcoilwtico=103.64),
 Row(dcoilwtico=95.25),
 Row(dcoilwtico=98.62),
 Row(dcoilwtico=97.01),
 Row(dcoilwtico=101.63),
 Row(dcoilwtico=102.17),
 Row(dcoilwtico=94.09),
 Row(dcoilwtico=90.74),
 Row(dcoilwtico=97.48),
 Row(dcoilwtico=105.47),
 Row(dcoilwtico=107.13),
 Row(dcoilwtico=95.13),
 Row(dcoilwtico=53.3),
 Row(dcoilwtico=99.69),
 Row(dcoilwtico=103.81),
 Row(dcoilwtico=102.76),
 Row(dcoilwtico=52.72),
 Row(dcoilwtico=106.83),
 Row(dcoilwtico=56.78),
 Row(dcoilwtico=53.56),
 Row(dcoilwtico=77.16),
 Row(dcoilwtico=50.12),
 Row(dcoilwtico=88.89),
 Row(dcoilwtico=104.76),
 Row

#### Checking for some basic mathematical values of call the column

In [21]:
df2.describe().show()

+-------+-----------------+------------------+----------------+-----------------+------------+--------+-----------------+----------+-----------------+------------------+------------------+
|summary|            index|         store_nbr|product_category|      onpromotion|holiday_type|  locale|       dcoilwtico|store_type|          cluster|      transactions|             sales|
+-------+-----------------+------------------+----------------+-----------------+------------+--------+-----------------+----------+-----------------+------------------+------------------+
|  count|           322047|            322047|          322047|           322047|      322047|  322047|           300003|    322047|           322047|            322047|            322047|
|   mean|         161023.0|26.994671585203402|            null|3.727136101252302|        null|    null|64.07791222084047|      null|8.531201967414693|1734.1178399426171| 406.3834521170104|
| stddev|92967.10540831095|15.595174164123565|         

#### Treating the null values

In [22]:
df2 = df2.na.fill(64.077, 'dcoilwtico')

In [23]:
Dict_Null = {col:df2.filter(df2[col].isNull()).count() for col in df2.columns}
Dict_Null

{'index': 0,
 'store_nbr': 0,
 'date': 0,
 'product_category': 0,
 'onpromotion': 0,
 'holiday_type': 0,
 'locale': 0,
 'transferred': 0,
 'dcoilwtico': 0,
 'store_type': 0,
 'cluster': 0,
 'transactions': 0,
 'sales': 0}

### NOW all the null values are taken care of.

In [24]:
df2.show(2)

+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
|index|store_nbr|               date|product_category|onpromotion|holiday_type|  locale|transferred|dcoilwtico|store_type|cluster|transactions|sales|
+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
|    0|       25|2013-01-01 00:00:00|      AUTOMOTIVE|          0|     Holiday|National|      false|    64.077|         D|      1|         770|  0.0|
|    1|       25|2013-01-01 00:00:00|       BABY CARE|          0|     Holiday|National|      false|    64.077|         D|      1|         770|  0.0|
+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+
only showing top 2 rows



#### As we can see ye have a date column which has a data type as datestamp.
- we can extract month, year, date, day of the week, time in hour to be more specific for our analysis. 

In [25]:
#from pyspark.sql.functions import *
#df2.withColumn("Date",to_date(current_timestamp())).show(truncate=False)

In [31]:
from pyspark.sql.functions import from_unixtime, date_format
df2 = df2.withColumn("year", date_format("date", "yyyy")).withColumn("month", date_format("date", "MM")).withColumn("day", date_format("date", "dd")).withColumn("hour", date_format("date", "HH")).withColumn("minute", date_format("date", "mm")).withColumn("day_of_week", date_format("date", "EE"))

In [32]:
df2.show(1)

+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+----+-----+---+----+------+-----------+
|index|store_nbr|               date|product_category|onpromotion|holiday_type|  locale|transferred|dcoilwtico|store_type|cluster|transactions|sales|year|month|day|hour|minute|day_of_week|
+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+----+-----+---+----+------+-----------+
|    0|       25|2013-01-01 00:00:00|      AUTOMOTIVE|          0|     Holiday|National|      false|    64.077|         D|      1|         770|  0.0|2013|   01| 01|  00|    00|        Tue|
+-----+---------+-------------------+----------------+-----------+------------+--------+-----------+----------+----------+-------+------------+-----+----+-----+---+----+------+-----------+
only showing top 1 row



#### Storing the final resut in JSON file format

In [2]:
df2.write.json("final_result.json")
