In [1]:
import os
import sys
sys.path.insert(0, os.path.abspath('./source/etl'))

from pyspark.sql import SparkSession
from SparkDBUtils import SparkDB
from delta import DeltaTable
import datetime as dt
import pyspark.sql
import pyspark.sql.functions as f
from pyspark.sql.types import DateType, StructType, StructField, IntegerType, TimestampType, StringType, FloatType
from pyspark.sql.window import Window

sparkdb = SparkDB()
spark = sparkdb.spark

In [29]:
simple_schema = StructType([
        StructField("date", DateType(), True),
        StructField("product", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("brand", StringType(), True),
        StructField("price", FloatType(), True),
        StructField("categories", StringType(), True),
        StructField("unit_price", FloatType(), True),
        StructField("units", StringType(), True),
        StructField("discount", FloatType(), True),
        StructField("ts_load", TimestampType(), True)
    ])

dataset = spark.read.option("delimiter", ";") \
            .csv("../../dataset/dataset_test_2.csv", schema=simple_schema, header=True)

# Merge Test

Inicialización

In [52]:
dataset.show()

+----------+--------------------+----------+-----------------+-----+--------------------+----------+-----+--------+-------+
|      date|             product|product_id|            brand|price|          categories|unit_price|units|discount|ts_load|
+----------+--------------------+----------+-----------------+-----+--------------------+----------+-----+--------+-------+
|2022-11-21|DIA MARI MARINERA...|      8456|DIA MARI MARINERA| 7.99|['Congelados', 'P...|      9.99|€/Kg.|    null|   null|
+----------+--------------------+----------+-----------------+-----+--------------------+----------+-----+--------+-------+



In [53]:
DeltaTable.forName(spark, "producto_dim").delete()

In [54]:
product_dim_new = dataset\
    .withColumn("categoria", f.lit(None)) \
    .select(["product_id",
             "product",
             "units",
             "brand",
             "categories",
             "categoria",
             "date"])
    
sparkdb.write_table(product_dim_new, "producto_dim", "append", "id_producto")

In [55]:
spark.table("producto_dim").show()

+-----------+--------------------+-----------------+--------------------+----------+----------+---------+-----+-------+
|id_producto|             product|            brand|          categories|product_id|      date|categoria|units|ts_load|
+-----------+--------------------+-----------------+--------------------+----------+----------+---------+-----+-------+
|       6928|DIA MARI MARINERA...|DIA MARI MARINERA|['Congelados', 'P...|      8456|2022-11-21|     null|€/Kg.|   null|
+-----------+--------------------+-----------------+--------------------+----------+----------+---------+-----+-------+



Segundo dataset

In [56]:
db = spark.table("producto_dim").alias("db").select("product_id","id_producto")

dataset2 = spark.read.option("delimiter", ";") \
            .csv("../../dataset/dataset_test.csv", schema=simple_schema, header=True).alias("dt")

In [57]:
dataset2.show()

+----------+--------------------+----------+-----------------+-----+--------------------+----------+-----+--------+-------+
|      date|             product|product_id|            brand|price|          categories|unit_price|units|discount|ts_load|
+----------+--------------------+----------+-----------------+-----+--------------------+----------+-----+--------+-------+
|2022-11-21|DIA MARI MARINERA...|      8456|DIA MARI MARINERA| 7.99|['Congelados', 'P...|      9.99|€/Kg.|    null|   null|
|2022-11-21|DIA ZUMOSFERA zum...|    277771|    DIA ZUMOSFERA| 1.05|['Bebidas', 'Zumos']|      1.75| €/l.|    null|   null|
+----------+--------------------+----------+-----------------+-----+--------------------+----------+-----+--------+-------+



In [58]:
final = dataset2.join(db, "product_id", "left").alias("f1")
final.show()

+----------+----------+--------------------+-----------------+-----+--------------------+----------+-----+--------+-------+-----------+
|product_id|      date|             product|            brand|price|          categories|unit_price|units|discount|ts_load|id_producto|
+----------+----------+--------------------+-----------------+-----+--------------------+----------+-----+--------+-------+-----------+
|      8456|2022-11-21|DIA MARI MARINERA...|DIA MARI MARINERA| 7.99|['Congelados', 'P...|      9.99|€/Kg.|    null|   null|       6928|
|    277771|2022-11-21|DIA ZUMOSFERA zum...|    DIA ZUMOSFERA| 1.05|['Bebidas', 'Zumos']|      1.75| €/l.|    null|   null|       null|
+----------+----------+--------------------+-----------------+-----+--------------------+----------+-----+--------+-------+-----------+



In [59]:
final2 = sparkdb.insert_id(final.where("id_producto is null"), "producto_dim", "id_producto").select("product_id","id_producto").alias("f2")

In [60]:
final2.show()

+----------+-----------+
|product_id|id_producto|
+----------+-----------+
|    277771|       6929|
+----------+-----------+



In [67]:
final.join(final2, "product_id", "left")\
    .select(["f1.product_id",f.coalesce(f.column("f1.id_producto"),f.column("f2.id_producto"))]).show()

+----------+----------------------------------+
|product_id|coalesce(id_producto, id_producto)|
+----------+----------------------------------+
|      8456|                              6928|
|    277771|                              6929|
+----------+----------------------------------+



# Fin Merge Test

In [4]:
# Ventana para obtener la ultima version de cada producto
window_spec = Window\
    .partitionBy("product_id")\
    .orderBy(f.col("date").desc())

# Nos quedamos con la ultima version de cada producto en el dataset,
# ya que se repiten en todas las fechas
product_dim_new = dataset\
    .withColumn("row_number", f.row_number().over(window_spec)) \
    .where("row_number = 1") \
    .select(["product_id",
             "product",
             "units",
             "brand",
             "categories",
             "date"])

In [19]:
DeltaTable.forName(spark, "producto_dim").delete()

In [20]:
spark.table("producto_dim").show()

+-----------+-------+-----+----------+----------+----+---------+-----+-------+
|id_producto|product|brand|categories|product_id|date|categoria|units|ts_load|
+-----------+-------+-----+----------+----------+----+---------+-----+-------+
+-----------+-------+-----+----------+----------+----+---------+-----+-------+



In [13]:
product_dim_new.where("product_id = 291364").select("product").show(truncate=False)

+-------------------------------------------------------------------------------+
|product                                                                        |
+-------------------------------------------------------------------------------+
|BISCUITS GALICIA galletas saladas con aceite de oliva virgen extra bolsa 200 gr|
+-------------------------------------------------------------------------------+



In [13]:
spark.table("producto_dim").groupby("product_id")\
    .agg(f.count("product_id").alias("cuenta"))\
    .where("cuenta > 1")\
    .show()

+----------+------+
|product_id|cuenta|
+----------+------+
|    291364|     2|
|    122969|     2|
|    288807|     2|
|    247479|     2|
|    218534|     2|
|    230021|     2|
|     48325|     2|
|    125714|     2|
|       234|     2|
|    242472|     2|
|    246502|     2|
|    129425|     2|
|     74305|     2|
|    144325|     3|
|    178096|     2|
|    263219|     2|
|    174067|     2|
|    197162|     2|
|    262429|     2|
|     62155|     2|
+----------+------+
only showing top 20 rows



In [6]:
DeltaTable.forName(spark, "producto_dia_fact").delete()

In [4]:
a = spark.table("precio_dia_agg_norm_fact")
a.summary().show()

+-------+------------------+------------------+-----------------+-----------------+--------------------+------------------------+------------------+-------------------+
|summary|           id_date|         sum_price|   sum_unit_price|     num_products| sum_price_ponderado|sum_unit_price_ponderado|    sum_price_norm|sum_unit_price_norm|
+-------+------------------+------------------+-----------------+-----------------+--------------------+------------------------+------------------+-------------------+
|  count|                40|                40|               40|               40|                  40|                      40|                40|                 40|
|   mean|              20.5|19157.161700528486|75706.94178423789|         5766.025|  3.3224462800255727|      13.136012865190631|0.7179146646917667|0.49032038748298323|
| stddev|11.690451944500118|1492.8646734958768|5735.428406341683|441.7045356285939|0.039774844549115305|      0.3038739220048766|0.2370321673717036|0.34065

In [85]:
fa = spark.table("producto_dia_fact").alias("fa")
d = spark.table("date_dim").alias("d")
d.printSchema()

root
 |-- id_date: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- ts_load: timestamp (nullable = true)



In [91]:
fa.join(d, "id_date")\
    .withColumn("anio",f.year("date"))\
    .groupby("anio").count().show()

+----+------+
|anio| count|
+----+------+
|2023| 82382|
|2022|148862|
+----+------+



In [50]:
df = spark.table("producto_dim")
df.write.saveAsTable("producto_dim2","delta","overwrite")

In [15]:
df2 = spark.table("producto_dim2")
df2.select("product_id").distinct().show()

In [17]:
df.summary().show()

+-------+----------------+--------------------+--------------+--------------------+------------------+-----------------+---------------+
|summary|     id_producto|             product|         brand|          categories|        product_id|        categoria|          units|
+-------+----------------+--------------------+--------------+--------------------+------------------+-----------------+---------------+
|  count|            6921|                6921|          6683|                6921|              6921|             6921|           6921|
|   mean|          3461.0|                null|          null|                null|185929.78613976465|             null|           null|
| stddev|1998.06493888462|                null|          null|                null| 93654.91515911062|             null|           null|
|    min|               1|1902 licor de gin...|ABADIA DA COVA|['Bebidas', 'Aguas']|               100|          Bebidas|          €/Kg.|
|    25%|            1730|               

In [22]:
df3 = df2.where("product_id < 117788")

In [51]:
a = delta.DeltaTable.forName(spark, "producto_dim2")
type(a)

delta.tables.DeltaTable

In [67]:
cols = ["id"]
data = [1,100]

dfb = spark.createDataFrame(data, pyspark.sql.types.IntegerType()).toDF(*cols)

In [74]:
l = list(dfb.select("id").toPandas()["id"])
",".join(l)

TypeError: sequence item 0: expected str instance, int found

In [52]:
a.toDF().show()

+-----------+--------------------+----------------+--------------------+----------+----------+-----------------+-----+--------------------+
|id_producto|             product|           brand|          categories|product_id|      date|        categoria|units|             ts_load|
+-----------+--------------------+----------------+--------------------+----------+----------+-----------------+-----+--------------------+
|          1|CARBONELL aceite ...|       CARBONELL|['Despensa', 'Ace...|       100|2022-11-21|         Despensa| €/l.|2023-02-26 19:35:...|
|          2|YBARRA salsa cock...|          YBARRA|['Despensa', 'Sal...|      1000|2022-11-21|         Despensa| €/l.|2023-02-26 19:35:...|
|          3|LU PRINCIPE Estre...|     LU PRINCIPE|['Despensa', 'Des...|    100267|2022-11-21|         Despensa|€/Kg.|2023-02-26 19:35:...|
|          4|LU PRINCIPE Estre...|     LU PRINCIPE|['Despensa', 'Des...|    100268|2022-11-21|         Despensa|€/Kg.|2023-02-26 19:35:...|
|          5|GULLON 

In [9]:
spark.sql("show tables in default").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|            date_dim|      false|
|  default|precio_dia_agg_no...|      false|
|  default|   producto_dia_fact|      false|
|  default|        producto_dim|      false|
|  default|       producto_dim2|      false|
|  default|       sequences_cfg|      false|
+---------+--------------------+-----------+



In [6]:
spark.sql("select * from sequences_cfg").show()

+------------+----+--------------------+
|  table_name|  id|             ts_load|
+------------+----+--------------------+
|producto_dim|6871|2023-02-26 19:32:...|
|    date_dim|  39|2023-02-26 19:32:...|
+------------+----+--------------------+



In [7]:
spark.sql("select * from producto_dia_fact").count()

224904

In [8]:
spark.sql("select * from precio_dia_agg_norm_fact").count()

39

In [3]:
producto_dia_fact = spark.table("producto_dia_fact").alias("pf")
date_dim = spark.table("date_dim").alias("d")
producto_dim = spark.table("producto_dim").alias("p")

In [4]:
p = producto_dia_fact\
    .join(date_dim, "id_date")\
    .join(producto_dim, "id_producto")

subset = p.select(f.column("pf.price"),
        f.column("d.date"),
        f.column("p.product"))\
    .where("p.id_producto == 4846")

subset.show(10)

+-----------------+----------+--------------------+
|            price|      date|             product|
+-----------------+----------+--------------------+
|7.989999771118164|2022-11-21|1902 licor de gin...|
|7.989999771118164|2022-12-08|1902 licor de gin...|
|7.989999771118164|2022-12-06|1902 licor de gin...|
|7.989999771118164|2022-12-29|1902 licor de gin...|
|7.989999771118164|2023-01-16|1902 licor de gin...|
|7.989999771118164|2023-01-12|1902 licor de gin...|
|7.989999771118164|2023-01-13|1902 licor de gin...|
|7.989999771118164|2022-11-26|1902 licor de gin...|
|7.989999771118164|2022-12-01|1902 licor de gin...|
|7.989999771118164|2022-11-23|1902 licor de gin...|
+-----------------+----------+--------------------+
only showing top 10 rows



In [35]:
pan = subset.toPandas()

pan.to_csv("c:/tmp/extract/test.csv", index=False)

In [15]:
subset.write\
    .option("header", "true")\
    .csv("file:///c:/tmp/extract/test.csv")

In [39]:
spark.table("date_dim").toPandas().to_csv("c:/tmp/extract/date_dim.csv", index=False)

  series = series.astype(t, copy=False)
