# Импорт библиотек

In [10]:
# Import SparkSession
from pyspark.sql import SparkSession
import pyspark.sql.types as typ
import pyspark.sql.functions as func

import datetime

import os

In [3]:
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_PYTHON"] = "python"

# Работа со Spark

## Создаем сессию   
Будет доступна админка по ссылке [http://localhost:4040/](http://localhost:4040/)

In [4]:
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[6]") \
      .appName("SparkSecond") \
      .config("spark.driver.memory", "8g")\
      .getOrCreate() 

## Задание 1.

### Заполняем фрейм.

In [27]:
columns = ['id', 'timestamp']
data = [[1, 1562007679], [1, 1562007710], [1, 1562007720], [1, 1562007750], [2, 1564682430], [2, 1564682450], [2, 1564682480]]
first_df = spark.createDataFrame(data).toDF(*columns)
first_df = first_df.withColumn('date_time', func.from_unixtime('timestamp').cast(typ.TimestampType()))
first_df.show()

+---+----------+-------------------+
| id| timestamp|          date_time|
+---+----------+-------------------+
|  1|1562007679|2019-07-02 00:01:19|
|  1|1562007710|2019-07-02 00:01:50|
|  1|1562007720|2019-07-02 00:02:00|
|  1|1562007750|2019-07-02 00:02:30|
|  2|1564682430|2019-08-01 23:00:30|
|  2|1564682450|2019-08-01 23:00:50|
|  2|1564682480|2019-08-01 23:01:20|
+---+----------+-------------------+



### Получаем длительность сессии в секундах

In [44]:
diff_df = first_df.groupBy('id').\
    agg(func.max('date_time').alias('max_date'), func.min('date_time').alias('min_date'))\
    .withColumn('session_in_secs', func.col('max_date').cast("long") - func.col('min_date').cast("long")).select('id', 'session_in_secs')
diff_df.show()

+---+---------------+
| id|session_in_secs|
+---+---------------+
|  1|             71|
|  2|             50|
+---+---------------+



## Задание 2.

### Заполняем фреймы.

In [45]:
columns = ['product', 'location', 'demand']
data = [[1, 1, 100], [1, 2, 110], [2, 1, 120], [2, 2, 90], [3, 1, 70], [3, 2, 80]]
first_df = spark.createDataFrame(data).toDF(*columns)
first_df.show()

+-------+--------+------+
|product|location|demand|
+-------+--------+------+
|      1|       1|   100|
|      1|       2|   110|
|      2|       1|   120|
|      2|       2|    90|
|      3|       1|    70|
|      3|       2|    80|
+-------+--------+------+



In [46]:
columns = ['product', 'location', 'stock']
data = [[1, 1, 1000], [1, 2, 400], [2, 1, 300], [2, 2, 250]]
second_df = spark.createDataFrame(data).toDF(*columns)
second_df.show()

+-------+--------+-----+
|product|location|stock|
+-------+--------+-----+
|      1|       1| 1000|
|      1|       2|  400|
|      2|       1|  300|
|      2|       2|  250|
+-------+--------+-----+



### Объединим датафреймы

In [49]:
joined_df = first_df.join(second_df, ['product', 'location'])
joined_df.show()

+-------+--------+------+-----+
|product|location|demand|stock|
+-------+--------+------+-----+
|      1|       1|   100| 1000|
|      1|       2|   110|  400|
|      2|       1|   120|  300|
|      2|       2|    90|  250|
+-------+--------+------+-----+



### Создадим еще фрейм с номерами недель и кол-во дней в них

In [47]:
columns = ['week', 'days']
data = [[0, 4], [1, 7], [2, 7], [3, 7], [4, 5]]
third_df = spark.createDataFrame(data).toDF(*columns)
third_df.show()

+----+----+
|week|days|
+----+----+
|   0|   4|
|   1|   7|
|   2|   7|
|   3|   7|
|   4|   5|
+----+----+



### Готовим витрину

In [55]:
joined_df.createOrReplaceTempView("sales_data")
third_df.createOrReplaceTempView("weeks")
vitrina = spark.sql("select * from sales_data, weeks").withColumns({
    'sales_calc': func.col('demand') * func.col('days'),
    'remaining_stock': func.col('stock') - func.col('sales_calc')
}).select('product', 'location', 'week', 'sales_calc', 'remaining_stock')
vitrina.show()

+-------+--------+----+----------+---------------+
|product|location|week|sales_calc|remaining_stock|
+-------+--------+----+----------+---------------+
|      1|       1|   0|       400|            600|
|      1|       2|   0|       440|            -40|
|      2|       1|   0|       480|           -180|
|      2|       2|   0|       360|           -110|
|      1|       1|   1|       700|            300|
|      1|       2|   1|       770|           -370|
|      2|       1|   1|       840|           -540|
|      2|       2|   1|       630|           -380|
|      1|       1|   2|       700|            300|
|      1|       2|   2|       770|           -370|
|      2|       1|   2|       840|           -540|
|      2|       2|   2|       630|           -380|
|      1|       1|   3|       700|            300|
|      1|       2|   3|       770|           -370|
|      2|       1|   3|       840|           -540|
|      2|       2|   3|       630|           -380|
|      1|       1|   4|       5