# Creating Gold Layer

Remember to run the previous notebooks first.

## Utilities

In [0]:
gold_layer_path = "dbfs:/FileStore/Northwind/Gold/"
database_name = "northwind"
spark.sql(f"USE SCHEMA {database_name};")

Out[1]: DataFrame[]

## Creating Gold Layer

4 tables will be created:
- product_sales: contains information about the number of times the products were sold.
- customer_number_of_purchases: contains information about how many times the customers made a purchase.
- customer_value_of_purchases: contains information about how much the customers spent on purchases.
- employees_sales_per_year: contains information about the number of sales the employees made per year.

#### Table: "products_sales"
Using **PySpark**

In [0]:
table_name = "products_sales"

df_products = spark.table('si_products')
df_product_count = (
    spark.table('si_order_details')
    .groupby('product_id')
    .count()
)
df_products_sales = (
    df_product_count
    .join(
        df_products,
        on=df_product_count.product_id==df_products.product_id
    )
    .select('product_name', 'count')
    .orderBy('count', ascending=False)
    .withColumnRenamed('count', 'sales')
)

df_products_sales.write.format("delta").mode("overwrite").saveAsTable(f"gl_{table_name}")
spark.sql(f"ALTER TABLE gl_{table_name} SET TBLPROPERTIES('quality'='gold')")
spark.catalog.refreshTable(f"gl_{table_name}")

#### Table: "customer_number_of_purchases"
Using **Koalas**

In [0]:
!pip install koalas  # Needed in the community edition (as the cluster has to be recreated)

Collecting koalas
  Downloading koalas-1.8.2-py3-none-any.whl (390 kB)
[?25l[K     |▉                               | 10 kB 20.7 MB/s eta 0:00:01[K     |█▊                              | 20 kB 5.8 MB/s eta 0:00:01[K     |██▌                             | 30 kB 8.2 MB/s eta 0:00:01[K     |███▍                            | 40 kB 4.0 MB/s eta 0:00:01[K     |████▏                           | 51 kB 4.4 MB/s eta 0:00:01[K     |█████                           | 61 kB 5.2 MB/s eta 0:00:01[K     |█████▉                          | 71 kB 5.7 MB/s eta 0:00:01[K     |██████▊                         | 81 kB 6.4 MB/s eta 0:00:01[K     |███████▌                        | 92 kB 6.1 MB/s eta 0:00:01[K     |████████▍                       | 102 kB 4.8 MB/s eta 0:00:01[K     |█████████▎                      | 112 kB 4.8 MB/s eta 0:00:01[K     |██████████                      | 122 kB 4.8 MB/s eta 0:00:01[K     |███████████                     | 133 kB 4.8 MB/s eta 0:00:01[K  

In [0]:
import databricks.koalas as ks

table_name = "customer_number_of_purchases"

df_customer_number_of_purchases = (
    ks.read_table('si_orders')
    .groupby('customer_id')
    .count()
    .join(
        ks.read_table('si_customers').
        set_index('customer_id')
    )[['order_id', 'company_name', 'contact_name', 'contact_title']]
    .rename(
        {'order_id': 'number_of_purchases'},
        axis=1
    )
)

df_customer_number_of_purchases.to_table(
    name=f"gl_{table_name}",
    format="delta",
    mode="overwrite"
)
spark.sql(f"ALTER TABLE gl_{table_name} SET TBLPROPERTIES('quality'='gold')")
spark.catalog.refreshTable(f"gl_{table_name}")



#### Table: "customer_value_of_purchases"

Using **Spark Pandas**

In [0]:
import pyspark.pandas as ps

table_name = "customer_value_of_purchases"

df_order_details = ps.DataFrame(spark.table('si_order_details'))
df_orders = ps.DataFrame(spark.table('si_orders')).set_index('order_id')
df_customers = ps.DataFrame(spark.table('si_customers')).set_index('customer_id')

df_order_details['order_value'] = (
    df_order_details['product_id'].astype("float") 
    * df_order_details['unit_price'].astype("float") 
    * (1-df_order_details['discount'].astype("float"))
)
df_customer_number_of_purchases = (
    df_order_details
    .groupby('order_id')
    .sum()['order_value']
    .to_frame()
    .join(df_orders)
    .groupby('customer_id')
    .sum()
    .join(df_customers)[['order_value', 'company_name', 'contact_name', 'contact_title']]
    .rename(
        {'order_value': 'value_of_purchases'},
        axis=1
    )
)

df_customer_number_of_purchases.to_table(
    name=f"gl_{table_name}",
    format="delta",
    mode="overwrite",
    mergeSchema="true"
)
spark.sql(f"ALTER TABLE gl_{table_name} SET TBLPROPERTIES('quality'='gold')")
spark.catalog.refreshTable(f"gl_{table_name}")

#### Table: "employees_sales_per_year"

Using **Spark SQL**

In [0]:
table_name = "employees_sales_per_year"

spark.sql(f"DROP TABLE IF EXISTS gl_{table_name};")

spark.sql(f"""
    CREATE TABLE gl_{table_name}
    TBLPROPERTIES('quality'='gold')
    AS SELECT *
    FROM (
    
    WITH sales_per_emp_per_year AS (
        SELECT
            employee_id,
            YEAR(order_date) as year,
            COUNT(order_id) AS number_of_sales
        FROM si_orders
        GROUP BY employee_id, year
    )
        
    SELECT
        si_employees.employee_id,
        si_employees.first_name,
        si_employees.last_name,
        sales_per_emp_per_year.year,
        sales_per_emp_per_year.number_of_sales
        FROM si_employees
        JOIN sales_per_emp_per_year
            ON si_employees.employee_id = sales_per_emp_per_year.employee_id
    
    );
""")

Out[6]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]