### Файлы

In [0]:
# Список загруженных файлов
dbutils.fs.ls("dbfs:/FileStore/tables/")

In [0]:
# Удаление файлов
# dbutils.fs.rm("dbfs:/FileStore/tables/sales_plan.xlsx")

### База данных

In [0]:
%sql
-- Создание новой базы данных.
drop database if exists sales;
create database sales;
use sales;
show databases;

databaseName
default
sales


In [0]:
%sql
-- Создание и заполнение таблицы значениями из файла csv
drop table if exists sales_fact;
create table sales_fact using csv options (path "dbfs:/FileStore/tables/sales_fact.csv", header "true", sep ";")

In [0]:
%sql
-- Группировка фактических продаж (разрез город)
select sf.city, sum(sf.amount) as total_amount_fact from sales_fact as sf group by sf.city

city,total_amount_fact
3_7,98.0
1_3,198.0
2_4,66.0
2_5,58.0
3_8,155.0
4_11,136.0
4_12,89.0
1_1,187.0
1_2,115.0
2_6,229.0


### Датафреймы

In [0]:
%python
# Запись результата запроса в датафрейм
df_sales_fact = spark.sql("select sf.city, sum(sf.amount) as total_amount_fact from sales_fact as sf group by sf.city")

In [0]:
%python
# Излечение данных из файла формата json
df_geo = spark.read.json("dbfs:/FileStore/tables/geo_json.json")

In [0]:
%python
# Излечение данных из файла формата xlsx
df_sales_plan = spark.read.format("com.crealytics.spark.excel") \
                          .option("header", "true") \
                          .option("inferSchema", "true") \
                          .load("dbfs:/FileStore/tables/sales_plan.xlsx") \
                          .withColumnRenamed("amount","total_amount_plan")

In [0]:
%python
# Объединение двух датафреймов
df_sales_fact_ = df_sales_fact.join(df_geo, df_sales_fact.city == df_geo.code_city) 
display(df_sales_fact_.head(5))

city,total_amount_fact,code_city,code_region,name_city,name_region
3_7,98.0,3_7,3,city7,south
1_3,198.0,1_3,1,city3,north
2_4,66.0,2_4,2,city4,east
2_5,58.0,2_5,2,city5,east
3_8,155.0,3_8,3,city8,south


In [0]:
%python
# Отбор столбцов
df_sales_fact_final = df_sales_fact_.select("name_city","name_region","total_amount_fact") \
                                    .orderBy("name_region","total_amount_fact") \
                                    .withColumnRenamed("name_city","city")
display(df_sales_fact_final.head(5))

city,name_region,total_amount_fact
city5,east,58.0
city4,east,66.0
city6,east,229.0
city2,north,115.0
city1,north,187.0


In [0]:
%python
# Применение группировки к датафрейму
from pyspark.sql import functions as F
df_sales_fact_group = df_sales_fact_final.groupBy("name_region").agg(F.sum("total_amount_fact").alias("total_amount_fact"))
display(df_sales_fact_group)

name_region,total_amount_fact
west,340.0
east,353.0
north,500.0
south,309.0


In [0]:
%python
# Создание вычисляемого столбца
df_sales = df_sales_plan.join(df_sales_fact_group, df_sales_plan.region == df_sales_fact_group.name_region)
df_sales = df_sales.withColumn("delta", F.round((df_sales.total_amount_fact - df_sales.total_amount_plan)/df_sales.total_amount_plan,2))
df_sales = df_sales.withColumn("value", F.when(df_sales.delta>0,"+").when((df_sales.delta>-0.5)&(df_sales.delta<=0),"-").otherwise("--")) \
                   .drop("name_region")                   
display(df_sales)

region,total_amount_plan,total_amount_fact,delta,value
east,600.0,353.0,-0.41,-
north,700.0,500.0,-0.29,-
south,450.0,309.0,-0.31,-
west,500.0,340.0,-0.32,-


### Графики и диаграммы

In [0]:
%python
# Построение графика
display(df_sales_fact_final)

city,name_region,total_amount_fact
city5,east,58.0
city4,east,66.0
city6,east,229.0
city2,north,115.0
city1,north,187.0
city3,north,198.0
city9,south,56.0
city7,south,98.0
city8,south,155.0
city12,west,89.0
