# Spark - Escritura

In [1]:
# pip install turfpy

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import json
from turfpy.measurement import boolean_point_in_polygon
from geojson import Point, Feature
import subprocess
import uuid
from datetime import datetime
import pytz
from pyspark.sql.functions import sum as _sum, max as _max, concat_ws, col, udf, to_timestamp, year, month, dayofmonth, hour, minute, second, to_date, date_format, row_number, desc
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

## Spark session

In [3]:
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Parquet

In [4]:
df = spark.read.parquet("/datalake/raw/stagging")

In [5]:
df.show()
df.count()

+--------+----------+-------------------+-----------+------------+-----------------+--------------------+--------------------+------------------+
|latitude| longitude|               date|customer_id| employee_id|quantity_products|            order_id|             commune|      neighborhood|
+--------+----------+-------------------+-----------+------------+-----------------+--------------------+--------------------+------------------+
|6.240851|-75.564423|17/06/2024 17:04:00|       1380| Employee_91|               39|5e943afa-9b6d-41e...|Comuna 10 - La Ca...|     Bomboná No. 1|
|6.226864| -75.52891|17/06/2024 17:04:16|       1679| Employee_65|               48|6ae6f0a7-f6b8-4aa...|Comuna 9 - Buenos...|     Ocho de Marzo|
|6.295899|-75.539886|17/06/2024 17:04:01|       1826| Employee_59|               41|5ad91aab-7a9b-49f...|  Comuna 1 - Popular|       La Avanzada|
|6.243185|-75.533776|17/06/2024 17:04:17|       1402|Employee_177|               50|6a3749d7-5a76-4a8...|Comuna 8 - Villa ..

5936

## hadoop

In [6]:
command = "hadoop fs -ls /datalake/raw/stagging | awk '{print $NF}'"
file_names = subprocess.check_output(command, shell=True).decode().split('\n')

command = "hadoop fs -ls /checkpoints/commits | awk '{print $NF}'"
commits_names = subprocess.check_output(command, shell=True).decode().split('\n')

command = "hadoop fs -ls /checkpoints/offsets | awk '{print $NF}'"
offsets_names = subprocess.check_output(command, shell=True).decode().split('\n')

command = "hadoop fs -ls /datalake/raw/stagging/_spark_metadata | awk '{print $NF}'"
metadata_names = subprocess.check_output(command, shell=True).decode().split('\n')

## Identificador de cada tienda
Se crea un store ID para cada tienda teniendo en cuenta: "latitude", "longitude", "commune" y "neighborhood"

In [7]:
df1 = df.withColumn("store_id", concat_ws("_", "latitude", "longitude", "commune", "neighborhood"))
df1.show()

+--------+----------+-------------------+-----------+------------+-----------------+--------------------+--------------------+------------------+--------------------+
|latitude| longitude|               date|customer_id| employee_id|quantity_products|            order_id|             commune|      neighborhood|            store_id|
+--------+----------+-------------------+-----------+------------+-----------------+--------------------+--------------------+------------------+--------------------+
|6.240851|-75.564423|17/06/2024 17:04:00|       1380| Employee_91|               39|5e943afa-9b6d-41e...|Comuna 10 - La Ca...|     Bomboná No. 1|6.240851_-75.5644...|
|6.226864| -75.52891|17/06/2024 17:04:16|       1679| Employee_65|               48|6ae6f0a7-f6b8-4aa...|Comuna 9 - Buenos...|     Ocho de Marzo|6.226864_-75.5289...|
|6.295899|-75.539886|17/06/2024 17:04:01|       1826| Employee_59|               41|5ad91aab-7a9b-49f...|  Comuna 1 - Popular|       La Avanzada|6.295899_-75.5398...

## Acumulado de ventas por día y tienda

In [8]:
# Convertir la columna 'date' a tipo timestamp
df1 = df1.withColumn("timestamp", to_timestamp(col("date"), "dd/MM/yyyy HH:mm:ss"))
df1.show()

+--------+----------+-------------------+-----------+------------+-----------------+--------------------+--------------------+------------------+--------------------+-------------------+
|latitude| longitude|               date|customer_id| employee_id|quantity_products|            order_id|             commune|      neighborhood|            store_id|          timestamp|
+--------+----------+-------------------+-----------+------------+-----------------+--------------------+--------------------+------------------+--------------------+-------------------+
|6.240851|-75.564423|17/06/2024 17:04:00|       1380| Employee_91|               39|5e943afa-9b6d-41e...|Comuna 10 - La Ca...|     Bomboná No. 1|6.240851_-75.5644...|2024-06-17 17:04:00|
|6.226864| -75.52891|17/06/2024 17:04:16|       1679| Employee_65|               48|6ae6f0a7-f6b8-4aa...|Comuna 9 - Buenos...|     Ocho de Marzo|6.226864_-75.5289...|2024-06-17 17:04:16|
|6.295899|-75.539886|17/06/2024 17:04:01|       1826| Employee_59

In [9]:
# Extraer solo la parte de la fecha
df1 = df1.withColumn("date_only", date_format(col("timestamp"), "yyyy-MM-dd"))
df1.select("date_only").show()

+----------+
| date_only|
+----------+
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
|2024-06-17|
+----------+
only showing top 20 rows



In [10]:
# Agrupar los datos por fecha y vendedor para calcular el total de ventas por vendedor
df_seller_aggregated = df1.groupBy("date_only", "employee_id") \
    .agg(_sum("quantity_products").alias("total_sales"))
df_seller_aggregated.show()

+----------+------------+-----------+
| date_only| employee_id|total_sales|
+----------+------------+-----------+
|2024-06-17| Employee_41|       2543|
|2024-06-17| Employee_97|       2916|
|2024-06-17| Employee_89|       2140|
|2024-06-17|Employee_145|       2716|
|2024-06-17|Employee_109|       3070|
|2024-06-17| Employee_81|       2358|
|2024-06-17|Employee_159|       2724|
|2024-06-17| Employee_95|       2447|
|2024-06-17|Employee_137|       2661|
|2024-06-17|  Employee_9|       2823|
|2024-06-17|Employee_167|       3023|
|2024-06-17|Employee_197|       2812|
|2024-06-17|Employee_111|       2647|
|2024-06-17|Employee_105|       1844|
|2024-06-17|  Employee_7|       3332|
|2024-06-17|Employee_149|       2887|
|2024-06-17|Employee_157|       2249|
|2024-06-17| Employee_77|       2456|
|2024-06-17|Employee_163|       2635|
|2024-06-17|Employee_107|       2853|
+----------+------------+-----------+
only showing top 20 rows



## Ranking de vendedores diarios

In [11]:
# Crear una ventana para particionar por fecha y ordenar por total de ventas de mayor a menor
window_spec = Window.partitionBy("date_only").orderBy(col("total_sales").desc())
# Agregar un número de fila para identificar al vendedor con más ventas cada día
df_seller_ranked_day = df_seller_aggregated.withColumn("rank", row_number().over(window_spec))
df_seller_ranked_day.show()

+----------+------------+-----------+----+
| date_only| employee_id|total_sales|rank|
+----------+------------+-----------+----+
|2024-06-17| Employee_91|       3847|   1|
|2024-06-17| Employee_31|       3744|   2|
|2024-06-17|Employee_189|       3567|   3|
|2024-06-17| Employee_59|       3523|   4|
|2024-06-17|Employee_183|       3447|   5|
|2024-06-17|Employee_117|       3409|   6|
|2024-06-17|Employee_129|       3408|   7|
|2024-06-17|Employee_151|       3403|   8|
|2024-06-17|  Employee_7|       3332|   9|
|2024-06-17| Employee_93|       3323|  10|
|2024-06-17| Employee_99|       3287|  11|
|2024-06-17| Employee_61|       3223|  12|
|2024-06-17|Employee_195|       3210|  13|
|2024-06-17| Employee_33|       3203|  14|
|2024-06-17|Employee_191|       3153|  15|
|2024-06-17| Employee_63|       3141|  16|
|2024-06-17|Employee_141|       3138|  17|
|2024-06-17| Employee_15|       3128|  18|
|2024-06-17| Employee_57|       3121|  19|
|2024-06-17|Employee_123|       3114|  20|
+----------

## Mayor vendedor del día

In [12]:
# Filtrar para obtener solo el vendedor con más ventas cada día
df_top_seller = df_seller_ranked_day.filter(col("rank") == 1).drop("rank")
df_top_seller.show()

+----------+-----------+-----------+
| date_only|employee_id|total_sales|
+----------+-----------+-----------+
|2024-06-17|Employee_91|       3847|
+----------+-----------+-----------+



# Acumulado diario de ventas por tienda

In [13]:
# Agrupar los datos por fecha y tienda para calcular el total de ventas en esa tienda
df_store_aggregated = df1.groupBy("date_only", "latitude", "longitude", "store_id") \
    .agg(_sum("quantity_products").alias("total_store"))
df_store_aggregated.show()

+----------+--------+----------+--------------------+-----------+
| date_only|latitude| longitude|            store_id|total_store|
+----------+--------+----------+--------------------+-----------+
|2024-06-17|6.231016|-75.531541|6.231016_-75.5315...|       2456|
|2024-06-17|6.221932|-75.528966|6.221932_-75.5289...|       2543|
|2024-06-17|6.255342|-75.555875|6.255342_-75.5558...|       2249|
|2024-06-17|6.274861|-75.533979|6.274861_-75.5339...|       2654|
|2024-06-17| 6.31383|-75.556608|6.31383_-75.55660...|       2421|
|2024-06-17|6.250895|-75.564412|6.250895_-75.5644...|       2396|
|2024-06-17| 6.23482| -75.54546|6.23482_-75.54546...|       2887|
|2024-06-17|6.233771|-75.542107|6.233771_-75.5421...|       2638|
|2024-06-17|6.263214|-75.565651|6.263214_-75.5656...|       3409|
|2024-06-17|6.273109|-75.563901|6.273109_-75.5639...|       2413|
|2024-06-17| 6.22135|-75.548236|6.22135_-75.54823...|       2533|
|2024-06-17|6.268129|-75.532389|6.268129_-75.5323...|       2624|
|2024-06-1

In [14]:
# Crear una ventana para particionar por fecha y ordenar por total de ventas de mayor a menor en cada tienda
window_spec2 = Window.partitionBy("date_only").orderBy(col("total_store").desc())
# Agregar un número de fila para identificar la tienda con más ventas cada día
df_store_ranked_day = df_store_aggregated.withColumn("rank", row_number().over(window_spec2))
df_store_ranked_day.show()

+----------+--------+----------+--------------------+-----------+----+
| date_only|latitude| longitude|            store_id|total_store|rank|
+----------+--------+----------+--------------------+-----------+----+
|2024-06-17|6.240851|-75.564423|6.240851_-75.5644...|       3847|   1|
|2024-06-17|6.228642|-75.529782|6.228642_-75.5297...|       3744|   2|
|2024-06-17|6.231971| -75.54485|6.231971_-75.5448...|       3567|   3|
|2024-06-17|6.295899|-75.539886|6.295899_-75.5398...|       3523|   4|
|2024-06-17|6.256769| -75.53969|6.256769_-75.5396...|       3447|   5|
|2024-06-17|6.263214|-75.565651|6.263214_-75.5656...|       3409|   6|
|2024-06-17|6.233554|-75.565071|6.233554_-75.5650...|       3408|   7|
|2024-06-17|6.221331|-75.527242|6.221331_-75.5272...|       3403|   8|
|2024-06-17|6.256553|-75.552019|6.256553_-75.5520...|       3332|   9|
|2024-06-17|6.244468|-75.557789|6.244468_-75.5577...|       3323|  10|
|2024-06-17|6.232999| -75.53153|6.232999_-75.5315...|       3287|  11|
|2024-

## Guardar resultados

In [16]:
df_seller_ranked_day.write.parquet("/datalake/gold/df_seller_ranked_day")
df_store_ranked_day.write.parquet("/datalake/gold/df_store_ranked_day")

                                                                                

In [17]:
df3 = spark.read.parquet("/datalake/gold/df_seller_ranked_day2")
df3.show()

+----------+------------+-----------+----+
| date_only| employee_id|total_sales|rank|
+----------+------------+-----------+----+
|2024-06-17| Employee_91|       3847|   1|
|2024-06-17| Employee_31|       3744|   2|
|2024-06-17|Employee_189|       3567|   3|
|2024-06-17| Employee_59|       3523|   4|
|2024-06-17|Employee_183|       3447|   5|
|2024-06-17|Employee_117|       3409|   6|
|2024-06-17|Employee_129|       3408|   7|
|2024-06-17|Employee_151|       3403|   8|
|2024-06-17|  Employee_7|       3332|   9|
|2024-06-17| Employee_93|       3323|  10|
|2024-06-17| Employee_99|       3287|  11|
|2024-06-17| Employee_61|       3223|  12|
|2024-06-17|Employee_195|       3210|  13|
|2024-06-17| Employee_33|       3203|  14|
|2024-06-17|Employee_191|       3153|  15|
|2024-06-17| Employee_63|       3141|  16|
|2024-06-17|Employee_141|       3138|  17|
|2024-06-17| Employee_15|       3128|  18|
|2024-06-17| Employee_57|       3121|  19|
|2024-06-17|Employee_123|       3114|  20|
+----------

In [18]:
df4 = spark.read.parquet("/datalake/gold/df_store_ranked_day2")
df4.show()

+----------+--------+----------+--------------------+-----------+----+
| date_only|latitude| longitude|            store_id|total_store|rank|
+----------+--------+----------+--------------------+-----------+----+
|2024-06-17|6.240851|-75.564423|6.240851_-75.5644...|       3847|   1|
|2024-06-17|6.228642|-75.529782|6.228642_-75.5297...|       3744|   2|
|2024-06-17|6.231971| -75.54485|6.231971_-75.5448...|       3567|   3|
|2024-06-17|6.295899|-75.539886|6.295899_-75.5398...|       3523|   4|
|2024-06-17|6.256769| -75.53969|6.256769_-75.5396...|       3447|   5|
|2024-06-17|6.263214|-75.565651|6.263214_-75.5656...|       3409|   6|
|2024-06-17|6.233554|-75.565071|6.233554_-75.5650...|       3408|   7|
|2024-06-17|6.221331|-75.527242|6.221331_-75.5272...|       3403|   8|
|2024-06-17|6.256553|-75.552019|6.256553_-75.5520...|       3332|   9|
|2024-06-17|6.244468|-75.557789|6.244468_-75.5577...|       3323|  10|
|2024-06-17|6.232999| -75.53153|6.232999_-75.5315...|       3287|  11|
|2024-

In [None]:
# df = df.withColumn("result", calcular_comuna_udf(df["latitude"], df["longitude"]))
# df = df.select("*", "result.*").drop("result")

# df = df.withColumn("date", to_timestamp(df["date"], "dd/MM/yyyy HH:mm:ss"))
# df = df.withColumn("day", dayofmonth(df["date"]))\
#        .withColumn("month", month(df["date"]))\
#        .withColumn("year", year(df["date"]))\
#        .withColumn("hour", hour(df["date"]))\
#        .withColumn("minute", minute(df["date"]))\
#        .withColumn("second", second(df["date"]))

# date_now = datetime.now(pytz.timezone('America/Bogota')).strftime("%d%m%Y_%H%M%S")
# path_write = f"/datalake/silver/stagging/{date_now}"


# df.write.parquet(path_write)

# for name in file_names:
#     if ".parquet" in name:
#         command = f"hadoop fs -mv {name} /datalake/raw/ingested"
#         subprocess.run(command, shell=True, check=True)
        
# for commit in commits_names:
#     if (commit != "items" and commit != ""):
#         command = f"hadoop fs -rm -r {commit}"
#         subprocess.run(command, shell=True, check=True)
        
# for offset in offsets_names:
#     if (offset != "items" and offset != ""):
#         command = f"hadoop fs -rm -r {offset}"
#         subprocess.run(command, shell=True, check=True)
        
# for meta in metadata_names:
#     if (meta != "items" and meta != ""):
#         command = f"hadoop fs -rm -r {meta}"
#         subprocess.run(command, shell=True, check=True)