## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# List of csv files
file_list = ['categories', 'customers', 'employee_territories', 'employees', 'orders',
            'orders_details', 'products', 'regions', 'shippers', 'suppliers',
            'territories']
# Parameters to open csv files (and save as parquet)
file_type = "csv"

infer_schema = "false"

first_row_is_header = "true"

delimiter = ","

dfs = {}
# For loop to load all the csv files and save every df as parquet
for file in file_list:
    # Loading the csv file
    file_location = f"dbfs:/FileStore/tables/{file}.csv"
    df = spark.read.format(file_type)\
        .option("inferSchema", infer_schema)\
        .option("header", first_row_is_header)\
        .option("sep", delimiter)\
        .load(file_location)
    dfs[file] = df
    # Save as parquet
    df.write.mode("overwrite").format("parquet").option("header", "true")\
    .save(f"dbfs:/FileStore/trusted/nw_data_{file}")

## Tabela Fato

In [0]:
fact_table = dfs["orders_details"].join(dfs["orders"][['orderid','customerid','employeeid','shipvia']],on='orderid', how='left')
fact_table = fact_table.join(dfs["products"][['productid','supplierid','categoryid']],on='productid', how='left')
fact_table = fact_table.join(dfs["employee_territories"][['employeeid','territoryid']],on='employeeid', how='left')
fact_table = fact_table.join(dfs["territories"][['territoryid','regionid']],on='territoryid', how='left')
fact_table = fact_table.withColumnRenamed('shipvia', 'shipperid')
dfs["fact_table"] = fact_table
display(fact_table)

territoryid,employeeid,productid,orderid,unitprice,quantity,discount,customerid,shipperid,supplierid,categoryid,regionid
14450,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
11747,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
10038,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
10019,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
8837,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
7960,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
2903,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
14450,5,42,10248,9.8,10,0.0,VINET,3,20,5,1
11747,5,42,10248,9.8,10,0.0,VINET,3,20,5,1
10038,5,42,10248,9.8,10,0.0,VINET,3,20,5,1


## Criar Views Temporárias

In [0]:
# Create a view or table
for key in dfs:
    temp_table_name = key

    dfs[key].createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `fact_table`

territoryid,employeeid,productid,orderid,unitprice,quantity,discount,customerid,shipperid,supplierid,categoryid,regionid
14450,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
11747,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
10038,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
10019,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
8837,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
7960,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
2903,5,11,10248,14.0,12,0.0,VINET,3,5,4,1
14450,5,42,10248,9.8,10,0.0,VINET,3,20,5,1
11747,5,42,10248,9.8,10,0.0,VINET,3,20,5,1
10038,5,42,10248,9.8,10,0.0,VINET,3,20,5,1


## Salval Tabelas no Databricks

In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

for key in dfs:
    permanent_table_name = key

    dfs[key].write.mode("overwrite").format("parquet").saveAsTable(permanent_table_name, path=f"dbfs:/user/hive/tables/{key}")

## Dependências

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import *

## Questão 1

In [0]:
display(dfs["fact_table"].groupBy("productid").agg(F.sum('quantity')).join(dfs["products"][['productid','productname']],on='productid', how='left').sort("sum(quantity)", ascending=True).take(3))

productid,sum(quantity),productname
37,511.0,Gravad lax
48,554.0,Chocolade
9,596.0,Mishi Kobe Niku


## Questão 2

In [0]:
display(dfs["fact_table"].groupBy("customerid").count().join(dfs["customers"][['customerid','contactname']],on='customerid', how='left').sort("count", ascending=False).take(5))

customerid,count,contactname
SAVEA,572,Jose Pavarotti
ERNSH,501,Roland Mendel
QUICK,400,Horst Kloss
HUNGO,308,Patricia McKenna
QUEEN,247,Lúcia Carvalho


## Questão 3

In [0]:
dfs["fact_table"] = dfs["fact_table"].withColumn("totalprice",dfs["fact_table"].quantity * dfs["fact_table"].unitprice)

In [0]:
display(dfs["fact_table"].groupBy("customerid").agg(F.sum('totalprice')).join(dfs["customers"][['customerid','contactname']],on='customerid', how='left').sort("sum(totalprice)", ascending=False).take(5))

customerid,sum(totalprice),contactname
SAVEA,659854.2000000005,Jose Pavarotti
QUICK,614910.6599999999,Horst Kloss
ERNSH,578581.69,Roland Mendel
HUNGO,316150.7600000001,Patricia McKenna
RATTC,223851.60000000003,Paula Wilson


## Questão 4

In [0]:
dfs["fact_table"] = dfs["fact_table"].join(dfs["orders"][['orderid','orderdate']],on='orderid', how='left')

In [0]:
dfs_employee = dfs["fact_table"].select(dfs["fact_table"].orderid,
                                 dfs["fact_table"].employeeid,
                                 dfs["fact_table"].totalprice
                                 ,year(col("orderdate")).alias("year")
                                 ,month(col("orderdate")).alias("month"))
display(dfs_employee.groupBy(["employeeid", "year", "month"]).agg(F.sum('totalprice')).join(dfs["employees"][['employeeid','firstname', 'lastname']],on='employeeid', how='left').sort(col("year").desc(), col("month").desc(), col("sum(totalprice)").desc()).take(15))

employeeid,year,month,sum(totalprice),firstname,lastname
4,1998,5,18825.0,Margaret,Peacock
2,1998,5,15214.5,Andrew,Fuller
1,1998,5,14107.500000000002,Nancy,Davolio
8,1998,5,12893.44,Laura,Callahan
7,1998,5,11730.499999999993,Robert,King
7,1998,4,347926.49999999994,Robert,King
2,1998,4,228767.35000000003,Andrew,Fuller
9,1998,4,66510.5,Anne,Dodsworth
3,1998,4,57195.8,Janet,Leverling
8,1998,4,56221.20000000001,Laura,Callahan


## Questão 5

In [0]:
display(dfs["fact_table"].join(dfs["customers"][['customerid', 'region']],on='customerid', how='left').groupBy(["customerid","region"]).count().groupBy(["region"]).count().sort("count", ascending=False).take(5))

region,count
,58
SP,6
OR,4
WA,3
RJ,3
