# Capa Consumption - Procesamiento de datos

A continuación, leeremos los distintos ficheros de la capa anterior y aplicamos las trasnformaciones oportunas. 

Los archivos a procesar son:
- FactInternetSales

En esta capa lo único que haremos es copiar los ficheros procesados en la capa anterior, crear un fichero adicional **_FactSalesSummary_** y crearemos las tablas deltas de todos los ficheros.

In [41]:
from pyspark.sql.types import *
from pyspark.sql.functions import concat, lit, datediff, current_date, when, col, substring, sum, length, countDistinct

StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 43, Finished, Available)

#### FactSalesSummary

In [42]:
df_fact_summary = spark.read.format("parquet").load("Files/CURATED/FactInternetSales.parquet")

df_fact_summary = df_fact_summary.withColumn(
    'OrderYear', substring(df_fact_summary['OrderDateKey'],0,4).cast('integer')
).withColumn(
    'OrderMonth', substring(df_fact_summary['OrderDateKey'],5,2).cast('integer')
)

df_fact_summary = df_fact_summary.groupBy('ProductKey', 'CustomerKey', 'SalesTerritoryKey', 'OrderYear', 'OrderMonth') \
    .agg(sum('TotalSales').alias('TotalSalesAmount'), sum('OrderQuantity').alias('TotalItems'), countDistinct('SalesOrderNumber').alias('TotalOrders'))

df_fact_summary = df_fact_summary.withColumn(
    'DateKey', when(length(df_fact_summary['OrderMonth'].cast('string'))==2,concat(df_fact_summary['OrderYear'].cast('string'),df_fact_summary['OrderMonth'].cast('string'),lit('01'))).otherwise(concat(df_fact_summary['OrderYear'].cast('string'),lit('0'),df_fact_summary['OrderMonth'].cast('string'),lit('01'))).cast('integer')
)

df_fact_summary.printSchema() 
display(df_fact_summary)


df_fact_summary.write.mode("overwrite").parquet('Files/CONSUMPTION/FactSalesSummary.parquet')
print ("FactSalesSummary agregada y guardada")


StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 44, Finished, Available)

root
 |-- ProductKey: integer (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- SalesTerritoryKey: integer (nullable = true)
 |-- OrderYear: integer (nullable = true)
 |-- OrderMonth: integer (nullable = true)
 |-- TotalSalesAmount: double (nullable = true)
 |-- TotalItems: long (nullable = true)
 |-- TotalOrders: long (nullable = false)
 |-- DateKey: integer (nullable = true)



SynapseWidget(Synapse.DataFrame, 7b285493-dda4-4d3b-b36e-339efa78745c)

FactSalesSummary agregada y guardada


#### FactInternetSales

In [31]:
#### FactInternetSales
df_fact_internet_sales = spark.read.format("parquet").load("Files/CURATED/FactInternetSales.parquet")

df_fact_internet_sales.write.mode("overwrite").parquet('Files/CONSUMPTION/FactInternetSales.parquet')
print ("FactInternetSales guardada en capa CONSUMPTION")


StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 33, Finished, Available)

FactInternetSales guardada en capa CONSUMPTION


#### DimCustomer

In [32]:
#### DimCustomer
df_dim_customer = spark.read.format("parquet").load("Files/CURATED/DimCustomer.parquet")

df_dim_customer.write.mode("overwrite").parquet('Files/CONSUMPTION/DimCustomer.parquet')
print ("DimCustomer guardada en capa CONSUMPTION")

StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 34, Finished, Available)

DimCustomer guardada en capa CONSUMPTION


#### DimDate

In [33]:
#### DimDate
df_dim_date = spark.read.format("parquet").load("Files/CURATED/DimDate.parquet")

df_dim_date.write.mode("overwrite").parquet('Files/CONSUMPTION/DimDate.parquet')
print ("DimDate guardada en capa CONSUMPTION")

StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 35, Finished, Available)

DimDate guardada en capa CONSUMPTION


#### DimProduct

In [34]:
#### DimProduct
df_dim_product = spark.read.format("parquet").load("Files/CURATED/DimProduct.parquet")

df_dim_product.write.mode("overwrite").parquet('Files/CONSUMPTION/DimProduct.parquet')
print ("DimProduct guardada en capa CONSUMPTION")

StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 36, Finished, Available)

DimProduct guardada en capa CONSUMPTION


#### DimSalesTerritory

In [35]:
#### DimSalesTerritory
df_dim_sales_territory = spark.read.format("parquet").load("Files/CURATED/DimSalesTerritory.parquet")

df_dim_sales_territory.write.mode("overwrite").parquet('Files/CONSUMPTION/DimSalesTerritory.parquet')
print ("DimSalesTerritory guardada en capa CONSUMPTION")

StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 37, Finished, Available)

DimSalesTerritory guardada en capa CONSUMPTION


## Creamos tablas delta

In [46]:
df_fact_summary.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("fact_summary")
df_fact_internet_sales.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("fact_internet_sales")
df_dim_customer.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("dim_customer")
df_dim_date.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("dim_date")
df_dim_product.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("dim_product")
df_dim_sales_territory.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("dim_sales_territory")

StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 48, Finished, Available)

In [45]:
%%sql 

drop table Demo_Oscar.fact_summary;

StatementMeta(, 222788d6-2333-4ea4-8dd9-2325e9188b21, 47, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>