# Proceso de ETL  con Apache Spark 
Este script tiene la finalidad de automatizar la extracción, tranformación y carga de datos hacia Bigquery. La extración de los datos se hará desde a SAP Hana y SQL Server. Este proceso se hará utilizando el framework de cómputo distribuido de Apache Spark. 


Más concretamente el proceso general conciste en los siguientes pasos:
 
1) Descarga de datos de las distintas fuentes (SAP Hana y SQL Server) por medio de Apache Spark
2) Procesamiento de datos por medio de Apache Spark 
2) Subir los datos a archivos parquet en Google Cloud Storage 
3) Tranferir los datos de Cloud Storage a las tablas de BigQuery

**NOTA:** Apache Spark dede su versión 3.0 introdujo un nuevo objeto de almacenamiento distribucido llamado **Spark Pandas Data Frame**. Este objeto es el que usaremos para hacer la manipulación de datos, junto con sentencias SQL sobre este mismo objeto. La ventaja de usar el Spark Pandas DF en comaración con el Spark DF, es que el primero utiliza métodos basados en Pandas, por lo que la manipulación de los datos será muy similar a la que se hace en Pandas.


**TIEMPOS DE EJECUCION:** Este script está diseñado para que se ejecute todos los días a las 5:00 AM, por ende tomará el día anterior de la fecha actual para cargar las operaciones de ayer

**GLOSARIO:**

Nombre de columnas: 
-**SALES_OFF = UNE**


**PARAMTROS PARA LA CARGA DE DATOS** 

In [2]:
# 1) UNEs a consultar 
UNEs = ('D001', 'D002')

# 2) Credenciales SAP 
direccion_IP_SAP = "jdbc:sap://192.168.198.234:30241/" # Acá contiene  más elemento, no solo el IP del host, acá un desloce de los elementos: 

# - jdbc: Indica que se está utilizando Java Database Connectivity (JDBC) para la conexión.
# - sap: Especifica el tipo de base de datos al que te estás conectando, en este caso, SAP HANA.
# - 192.168.198.234: Es la dirección IP del host donde se encuentra la base de datos SAP HANA.
# - 30241: Es el puerto a través del cual se establece la conexión a la base de datos.

usuario_SAP  = "QLIKSAP"
password_SAP = "20S@pqlik"
driver_SAP = "com.sap.db.jdbc.Driver"


# 3) Credenciales SQL Sever (Eroute) 
# Configuración para la conexión a SQL Server
direccion_IP_SQL_SERVER = "jdbc:sqlserver://192.168.0.192;encrypt=false;databaseName=ASR;" # Muy importatante, vease como tengo el parametro encrypt=false, sin esto, no me dejaba conectarme a la BD 

usuario_SQL_Server = "consulta" 
password_SQL_Server  = "Consult@"
driver_SQL_Server = "com.microsoft.sqlserver.jdbc.SQLServerDriver" 



# 4) Parametros de GCP. 
# La exportación de los datos se hará a GCP  a 2 distintas repositorios de datos los cuales son Google Cloud Storage y BigQuery, en esta sección vamos a idnetificar los paámetros que se requiere para la correcta exportación de datos 
bucket = "gs://etl_apache_spark"
proyecto = "ciencia-de-datos-398421" # Este es el proyecto global de GCP
conjunto_datos_SAP_BQ = "SAP_Eroute"  # Esto es el conjunto de datos (equivalente a mi base de datos) en Bigquery. 
modo_escritura  = "WRITE_TRUNCATE"   # WRITE_APPEND va a agregar datos a los ya existentes, WRITE_TRUNCATE reemplaza los datos existentes por los nuevos en BQ 

**IMPORTACION DE LIBRERIAS**

In [3]:
import datetime
import pandas as pd
from google.cloud import bigquery # Interacción con bigqueyr 
from google.oauth2 import service_account # Autentificación en GCP 
import os 
from pyspark.pandas.config import set_option



 **SELECCION DE FECHA DE INICIO A FIN DE LA CARGA**
 Este script está pensado para ser un script que obtenga los registros solo de un día (el día de ayer) 

In [4]:
# 1) Obtengo la fecha actual 
fecha_actual  = datetime.date.today()

# 2) La paso a un objeto de pandas date time 
fecha_actual = pd.to_datetime(arg=fecha_actual) # Dado que el objeto de entrada es ya un objeto date time, no es neceario que le  indique el formato 
print("Fecha actual:", fecha_actual)

# 3) Obtengo la fecha del día de ayer 
dia = pd.to_timedelta(arg=1, unit="d") 
fecha_ayer = fecha_actual - dia
print("Fecha ayer:", fecha_ayer)

Fecha actual: 2024-03-01 00:00:00
Fecha ayer: 2024-02-29 00:00:00


In [5]:
# Todo los almacenams en tipo string 

current_year=  str(fecha_actual.year)
current_month =  str(fecha_actual.month)
current_day = str(fecha_actual.day)

# Hacemos una pequeña adaptación de los datos para le fornmato de la columna de DIA CLAVE dada de alta como STRING en SAP
if len(current_month) < 2 :
    current_month = '0' + current_month

yesterday_year =   str(fecha_ayer.year)
yesterday_month = str(fecha_ayer.month) 
yesterday_day =  str(fecha_ayer.day)

# Hacemos una pequeña adaptación de los datos para le fornmato de la columna de DIA CLAVE dada de alta como STRING en SAP  
if len(yesterday_month) < 2 :
    yesterday_month = '0' + yesterday_month

fecha_ayer = yesterday_day + "/" + yesterday_month + "/" + yesterday_year 

print("Año hoy:", current_year)
print("Mes hoy:", current_month)
print("Dia hoy:", current_day)

print("Año ayer:", yesterday_year)
print("Mes ayer:", yesterday_month)
print("Dia ayer:", yesterday_day)

Año hoy: 2024
Mes hoy: 03
Dia hoy: 1
Año ayer: 2024
Mes ayer: 02
Dia ayer: 29


In [6]:
# Primer formato de fecha de ayer 
yesterday_date =   yesterday_day + "/" + yesterday_month + "/" + yesterday_year

# Segundo formato de  fecha de ayer
yesterday_date_second_format  = yesterday_year + yesterday_month + yesterday_day # Esto es año, mes y dia 

print("primer formato:", yesterday_date)
print("Segundo formato:", yesterday_date_second_format)

primer formato: 29/02/2024
Segundo formato: 20240229


## 1) Autentificación en GCP 
La autentificación servira para poder hacer las tranferencias de datos desde hacía Cloud Storage y de Cloud Storage hacia Bigquery 

In [7]:
# Dirección de trabajo 
direccion = os.getcwd()

# Autentificación con las credenciales, nota: Estas credenciales ya contienen los permisos para interactual en gran medida con Google Cloud Storage y Bigquery 
bqcreds = service_account.Credentials.from_service_account_file(filename= direccion + "\\Credenciales\\Auth_Storage_BigQuery.json",  scopes = ['https://www.googleapis.com/auth/cloud-platform'])

# Obtencion del cliente de biquery que permitira interactual con bigquery 
client = bigquery.Client(credentials=bqcreds, project=bqcreds.project_id) # El cliente de bigquery es necesario para poder escribr en las tablas de BQ
client

<google.cloud.bigquery.client.Client at 0x2217ce53bf0>

## 2) Inicio de sesión en Apache Spark
El inicio de sesión de mi aplicación de Apache Spark es muy relevant, aqui le indicaremos los siguinetes aspectos de la sesión: 

- Proporcionar los conectores JDBC: Tenemos que descargar los controladores necesarioas para hacer las conexiones y brindale la dirección de dichas credenciales  
- Definición del cluster manager 
- RAM para cada executor (Exeutor memory):


**PENDIENET** profundizar más en todos los aspectos que puedo configurar y mencionar la configuración por degault que se tiene, ahorita simplemente le aumentare la memoria disponible que puede consumir 

 

In [8]:
import findspark # Se importa antes de cualquier inicialización  o impotación de spark 
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName(name="ETL GCP") \
    .config("spark.executor.memory", "12g") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", r"C:\Users\OMBARRAZA\Documents\Sello Rojo\Apache Spark ETL\Credenciales\Auth_Storage_BigQuery.json") \
    .config("spark.jars", r"C:\Users\OMBARRAZA\Documents\Sello Rojo\Apache Spark ETL\drivers\sap\ngdbc-2.18.11.jar," +
            r"C:\Users\OMBARRAZA\Documents\Sello Rojo\Apache Spark ETL\drivers\sqljdbc_12.2\enu\mssql-jdbc-12.2.0.jre8.jar," +
            r"C:\Users\OMBARRAZA\miniconda3\envs\Apache_Spark\Lib\site-packages\pyspark\jars\gcs-connector-hadoop3-latest.jar") \
    .getOrCreate()

spark

In [9]:
configuracion = spark.sparkContext.getConf()

print("Memoria del ejecutor:", configuracion.get("spark.executor.memory"))

Memoria del ejecutor: 12g


## 3) Conexión a las bases de datos 

### 3.1) Autentificación a SAP HANA 

Usaremos los parametros previamente establecidos 

In [10]:
# 1) Le damos la dirección IP donde se encuentra el host de SAP Hana 
database_url_sap = direccion_IP_SAP

# 2) Le damos la autentificación 
database_properties_sap = {
    "user": usuario_SAP,
    "password": password_SAP,
    "driver": driver_SAP
}

### 3.2) Autentificación SQL Server 

In [11]:
# Configuración para la conexión a SQL Server
database_url_sql_server = direccion_IP_SQL_SERVER

database_properties_sql_server = { 
          "user": usuario_SQL_Server, 
          "password": password_SQL_Server, 
          "driver": driver_SQL_Server
}

## 4) Descarga e inspección de datos 
La descarga de datos se hace para ciertos aspetros de la operación de interés. Los más relevantes son: 
 
1) **Agenda Actual:** Los datos vinculados a la agenda son muy relevantes, estos nos permite observar cual es la agenda actual de cada vendedor y compararlo con las visitas reales que ha tenido. Con estos 2 conjuntos de datos podemos observar cual es el grado de cumplimiento de la agenda de cada vendedor para cada día  y a nivel semanal
2)  **Agenda Historica:**

PONER QUE DATOS VAMOS A DESCARGAR!! 

### 4.1) Agenda actual (clientes activos), desde SAP  

**Estos son los clientes activos actuales**  

La agenda actual la sacaremos de **SAP Hana**, para construir la agenda actual de las rutas tenemos que consultar las siguientes tablas: 

- **KNA1:** Contiene información acerca de los de clientes registrados en la empresa, asi como su estado (activo o inactivo) 
- **KNVV:** Contiene los datos de las rutas (que rutas atienden a que cliente)  y a que UNEs pertenece 


Dado que tenemos estas 2 tablas y requereimos información de ambas, vamos a hacer una unión de tablas

**COLUMNAS DE KNA1:**
- **KUNNR:** ID único de cada cliente, le dimos el alias de **ClienteClave**
- **NAME1**: Nomnbre registrado del cliente, le dimos el alias de **Nombre**
- **ORT02:** Es la localidad donde está registrado el cliente, le dimos el alias de **Localidad**
- **ORT01:** Es la colonia del cliente, se le dio el alias de **Colonia** 
- **PSTLZ:** Es el codigo postal del domicilio registrado del cliente, se le dio el alias de **CodigoPostal**
- **REGIO**: Es el estado de la republica, se le dio el alias de **Estado**
- **STRAS:** Es la calle y nbúmero del cliente, se le dio el alias de **Calle**
- **BAHNE:** Es la hora de apertura del cliente, se le dio el alias de **HoraApertura**
- **BAHNS** Es la hora de cierre del cliente, se le dio el alias de **HoraCierre**
- **LZONE:** Es la UNE a la que pertenece el cliente, le dimos el alías de **UNE**
- **LOEVM:** Es el estatus del cliente, le dimos el alias de **STATUS**  
- **KATR1:** Corresponde al día de visita lunes, se le dio el alias de **Lunes**
- **KATR2:** Corresponde al día de visita martes, se le dio el alias de **Martes**
- **KATR3:** Corresponde al día de visita miercoles, se le dio el alias de **Miercoles**
- **KATR4:** Corresponde al día de visita jueves, se le dio el alias de **Jueves**
- **KATR5:** Corresponde al día de visita viernes, se le dio el alias de **Viernes**
- **KATR6:** Corresponde al día de visita sábado, se le dio el alias de **Sabado**
- **KATR7:** Corresponde al día de visita domingo, se le dio el alias de **Domingo**



In [12]:
# Declaración del query  
query_clientes = f"""
SELECT 
KUNNR as ClienteClave, NAME1 as Nombre, ORT02 as Localidad,  ORT01 as Colonia, PSTLZ as CodigoPostal, 
REGIO as Estado, STRAS as Calle,  BAHNE as HoraApertura, BAHNS as HoraCierre, LZONE as UNE, LOEVM as STATUS, 
KATR1 as Lunes, KATR2 as Martes, KATR3 as Miercoles, KATR4 as Jueves, KATR5 as Viernes,  KATR6 as Sabado, 
KATR7 as Domingo
FROM SAPABAP1.KNA1
WHERE LZONE  IN {UNEs}
AND LOEVM = ''
AND  KATR1  != ''
AND  KATR2 != ''
AND  KATR3 != ''
AND  KATR4 != ''
AND  KATR5 != ''
AND  KATR6 != ''
"""

# IMPORTANTE: Tengo que darle el query en una cadena dentro de paranetesis 
query_clientes =  f"({query_clientes}) as query_alias"

**NOTA IMPORTANTE: SELECCION DE CLIENTES ACTIVOS** 
Los clientes activos o inactivos se pueden diferenciar con la columna "LOEVM" la cual tiene el nombre comun de "Petición de borrado", esta columna si está marcada con una X significa que ese cliente esta inactivo. Por lo tanto nos quedaremos solo con los clientes con esa columna vacia.

Adicionalmente me quedo solo con los clientes que tienen una frecuencia de visita, la frecuencina de visita es un valor binario que está en las columnas KATR1, KATR2, KATR3 etc. Son los días de la semana y se pone un 1 cuando se visita ese día de la semana 

In [13]:
%%time
# 1) Leer datos en un DataFrame
agenda_actual_clientes_activos = spark.read.jdbc(url=database_url_sap, table=query_clientes, properties=database_properties_sap)

#print("Tipo de objeto:", type(agenda_actual_clientes_activos))
# Notese que cuando lees un archivo por default se guarda en un spark data frame, yo lo convertire a un spark pandas DF 

# Lo paso a un Spark Pandas DF 
agenda_actual_clientes_activos = agenda_actual_clientes_activos.to_pandas_on_spark()

#print("Tipo de objeto:", type(agenda_actual_clientes_activos)) # Vease que ahora cambio del tipo de objeto

# Mostrar los datos leídos
#agenda_actual_clientes_activos # Ahota puedo interactual con él basicamente como un pandas DF 
# FALTA ALGO, quiero integrar el TIPO de clientes !!!, agregar eso 

CPU times: total: 15.6 ms
Wall time: 1.58 s




Inspeccion de ID de clientes activos 

In [14]:
#agenda_actual_clientes_activos["CLIENTECLAVE"].unique()

In [15]:
#agenda_actual_clientes_activos.query("KUNNR == 'E17664'") # Como se puede observa hay clients activos que tienen un ID fuera de lo normal 

**GENERACION DE VISITAS SEMANALES POR CLIENTE**

In [16]:
# 1) Cambiamos el tipo de variables 
agenda_actual_clientes_activos["LUNES"] = agenda_actual_clientes_activos["LUNES"].astype(int)
agenda_actual_clientes_activos["MARTES"] = agenda_actual_clientes_activos["MARTES"].astype(int)
agenda_actual_clientes_activos["MIERCOLES"] = agenda_actual_clientes_activos["MIERCOLES"].astype(int)
agenda_actual_clientes_activos["JUEVES"] = agenda_actual_clientes_activos["JUEVES"].astype(int)
agenda_actual_clientes_activos["VIERNES"] = agenda_actual_clientes_activos["VIERNES"].astype(int)
agenda_actual_clientes_activos["SABADO"] = agenda_actual_clientes_activos["SABADO"].astype(int)
agenda_actual_clientes_activos["DOMINGO"] = agenda_actual_clientes_activos["DOMINGO"].astype(int)

In [17]:
# 1) Ejecución de la suma 
set_option("compute.ops_on_diff_frames", True) # SUPER IMPORATANTE: permite ciertas operaciones de los spark pandas DF, se debe de ejecutar despues de crear la sesión de Spark!!

agenda_actual_clientes_activos['Frecuencia_Semanal'] = agenda_actual_clientes_activos.loc[:, 'LUNES':'SABADO'].sum(axis=1)
# Importante, esta operación en concreto sobre este spark pandas DF solo se pudo hacer habilitado la opcion de set_option("compute.ops_on_diff_frames", True) 

In [18]:
# Filtro de clientes activos realmente con visita programaeda  
agenda_actual_clientes_activos = agenda_actual_clientes_activos.query("Frecuencia_Semanal  > 0 ") # Nos quedamos con los clientes que tienen mayor a 0  visitas por semana 

agenda_actual_clientes_activos.reset_index(drop=True, inplace=True)

#### 4.1.1) Extracción de la tabla clientes - rutas
**EXTRACCION DE LA TABLA CLIENTES - RUTAS**

NOTAS:
- **KUNNR:** Es el ID del cliente 
- **VWERK:** Es la UNE al que pertenece 
- **VKGRP:** Es el Ruta

In [19]:
%%time 

# 1) Aislo la columna  de interés
ID_clientes_activos = agenda_actual_clientes_activos[["CLIENTECLAVE"]]

# 2) Valors unicos 
ID_clientes_activos = ID_clientes_activos["CLIENTECLAVE"].unique() # Esto me generará una spark pandas Serie

# 3) Lo paso el spark serie  a un numpy array 
ID_clientes_activos = ID_clientes_activos.to_numpy()

# 4) Lo paso a una tupla 
ID_clientes_activos = tuple (ID_clientes_activos) 



Py4JJavaError: An error occurred while calling o668.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (DESKTOP-S528GJL.sellorojo.com executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:124)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	... 23 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:124)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	... 23 more


In [20]:
query_rutas_cliente = f"""
SELECT 
    KNVV.KUNNR as ClienteClave, 
    KNVV.VKGRP as RUTA, 
    KNVV.ERDAT as FECHA
FROM 
    SAPABAP1.KNVV 
INNER JOIN (
    SELECT 
        KUNNR, 
        MAX(TO_DATE(ERDAT, 'YYYYMMDD')) AS MaxDate
    FROM 
        SAPABAP1.KNVV
    GROUP BY 
        KUNNR
) AS LatestRecords
ON 
    KNVV.KUNNR = LatestRecords.KUNNR AND TO_DATE(KNVV.ERDAT, 'YYYYMMDD') = LatestRecords.MaxDate
WHERE  KNVV.SPART = 10  AND  KNVV.KUNNR IN {ID_clientes_activos}
ORDER BY 
    TO_DATE(KNVV.ERDAT, 'YYYYMMDD') DESC
"""

# IMPORTANTE: Tengo que darle el query en una cadena dentro de paranetesis 
query_rutas_cliente =  f"({query_rutas_cliente}) as query_alias"

Py4JJavaError: An error occurred while calling o702.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 5) (DESKTOP-S528GJL.sellorojo.com executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:124)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	... 23 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:124)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	... 23 more


In [None]:
# 1) Leer datos en un DataFrame
rutas_cliente = spark.read.jdbc(url=database_url_sap, table=query_rutas_cliente, properties=database_properties_sap)

# 2) Lo pasamos a un spark pandas DF 
rutas_cliente = rutas_cliente.to_pandas_on_spark()

**VINCULACION DE RUTAS A CLIENTES ACTUALES ACTIVOS**

In [None]:
agenda_actual_clientes_activos = agenda_actual_clientes_activos.merge(right=rutas_cliente, on="CLIENTECLAVE", how="left")

### 4.2) Agenda historica, desde Eroute  

**SOLO AYER:** se almacenara solo la agenda del dia de ayer  
- La agenda historica la sacaremos de Eroute,  ..... EXPLICAR QUE ES, es la agenda histprica de cada cliente o de cada ruia o de ambas?
- **FALTA PONER EL SIGNIFICATO DE TODAS LAS COLUMNAS** 

In [None]:
%%time 

# 1) Alta del query 
query_agenda_historica = f"""
select  DiaClave,FrecuenciaClave, RUTClave, ClienteClave, Orden, ClaveCEDI from AgendaVendedor where ClaveCEDI in {UNEs} 
 and cast(DiaClave as date)  =  '{yesterday_day}-{yesterday_month}-{yesterday_year}'                
"""   
# El formato de fechas es en dias - mes - año 


# 2) Modificación del query 
# IMPORTANTE: este es el formato para los queries en spark  
query_agenda_historica =  f"({query_agenda_historica}) as query_alias"

# 3) Lectura de datos a un Spark DF 
agenda_historica = spark.read.jdbc(url=database_url_sql_server, table=query_agenda_historica, properties=database_properties_sql_server)

**AGREGAR SEMANA , AÑO Y MES**

In [None]:
from pyspark.sql.functions import to_date

# Suponiendo que df es tu DataFrame y 'fecha_string' es el nombre de la columna que contiene las fechas como strings.
agenda_historica = agenda_historica.withColumn("DiaClave", to_date(agenda_historica.DiaClave, "dd/MM/yyyy"))
agenda_historica = agenda_historica.to_pandas_on_spark()

In [None]:
agenda_historica["Month"] = agenda_historica["DiaClave"].dt.month
agenda_historica["Year"] = agenda_historica["DiaClave"].dt.year
agenda_historica["Week"] = agenda_historica["DiaClave"].dt.week
agenda_historica["Day"] = agenda_historica["DiaClave"].dt.day

### 4.3) Visitas desde Eroute
**SOLO DE AYER:** Se capturas las visitas solo de ayer 

**EXPLICACION DE LAS COLUMNAS**:
- **VisitaClave:** ID Único de la visita 
- **DiaClave:** Fecha de la visita 
- **ClienteClave:** ID  único del cliente
- **RUTClave:** ID único de la ruta que realizo esa visita 
- **Numero:** Es la secuencia en la que se visito ese día 
- **FechaHoraInicial:** Fecha y hora de llegada al cliente
- **FechaHoraFinal:** Fecha y hora de salida de con el cliente 

In [22]:
# 1) Query de las visitas 
query_visitas = f"""
select Visita.VisitaClave, Visita.DiaClave, Visita.ClienteClave, Visita.RUTClave, Ruta.AlmacenID, Visita.Numero,
       Visita.FechaHoraInicial, Visita.FechaHoraFinal from Visita
inner join Ruta on Visita.RUTClave = Ruta.RUTClave
where cast(DiaClave as date ) ='{yesterday_day}/{yesterday_month}/{yesterday_year}'
"""
## El formato es dia/mes/año

# 2) Tranformación del formato 
query_visitas =  f"({query_visitas}) as query_alias"

# 3) Leer datos en un Spark DataFrame
visitas = spark.read.jdbc(url=database_url_sql_server, table=query_visitas, properties=database_properties_sql_server)


**AGREGAMOS SEMANA, MES y AÑO**

In [24]:


# 1) Lo pasamos  a un formato de fecha 
visitas = visitas.withColumn("DiaClave", to_date(visitas.DiaClave, "dd/MM/yyyy"))

# 2) Lo pasamos a un spark pandas DF 
visitas = visitas.to_pandas_on_spark()

# 3) Generamos el mes, año y semana apartir de la fecha con la funcion dt.. 
visitas["Month"] = visitas["DiaClave"].dt.month
visitas["Year"] = visitas["DiaClave"].dt.year
visitas["Week"] = visitas["DiaClave"].dt.week
visitas["Day"] = visitas["DiaClave"].dt.day
visitas



Unnamed: 0,VisitaClave,DiaClave,ClienteClave,RUTClave,AlmacenID,Numero,FechaHoraInicial,FechaHoraFinal,Month,Year,Week,Day
0,1041FFC33428FED8,2024-02-29,338300000,D001042,D001,23,2024-02-29 08:37:48,2024-02-29 08:37:48,2,2024,9,29
1,10B729937FCAB055,2024-02-29,609300000,D001039,D001,4,2024-02-29 03:31:50,2024-02-29 03:31:50,2,2024,9,29
2,10BF7AEE4D2B2AB7,2024-02-29,1169930000,D121201,D121,37,2024-02-29 14:07:12,2024-02-29 14:08:42,2,2024,9,29
3,116A34C73911AE34,2024-02-29,1167720000,D121010,D121,4,2024-02-29 07:15:16,2024-02-29 07:20:05,2,2024,9,29
4,12F7DE9E301747BF,2024-02-29,1560110001,D095007,D095,16,2024-02-29 14:06:18,2024-02-29 14:07:10,2,2024,9,29
5,136A71846DBC6B3A,2024-02-29,121850006,D082102,D082,12,2024-02-29 11:54:51,2024-02-29 11:58:51,2,2024,9,29
6,143A3B6A90ED574F,2024-02-29,869070000,D007004,D007,49,2024-02-29 14:29:04,2024-02-29 14:29:04,2,2024,9,29
7,156D6DA37477214A,2024-02-29,467170000,D002042,D002,20,2024-02-29 11:15:16,2024-02-29 11:15:16,2,2024,9,29
8,1572F57D1E323B9B,2024-02-29,890160292,D045101,D045,3,2024-02-29 08:15:18,2024-02-29 08:15:18,2,2024,9,29
9,15FAA934DCCFCDF8,2024-02-29,467240000,D002042,D002,14,2024-02-29 10:28:41,2024-02-29 10:28:54,2,2024,9,29


In [26]:
#from pyspark.sql.functions import date_format

# 4) Tranformamos a un spark pandas DF 
## NOTA: Aparentemente no puedo pasar el spark DF a un spark pandas DF si hay una columna tipo datetime, tengo que pasarla a tipo string , acá una referencia: 
# https://stackoverflow.com/questions/76072664/convert-pyspark-dataframe-to-pandas-dataframe-fails-on-timestamp-column

#visitas = visitas.withColumn("FechaHoraInicial", date_format("FechaHoraInicial", "yyyy-MM-dd HH:mm:ss"))
#visitas= visitas.withColumn("FechaHoraFinal", date_format("FechaHoraFinal", "yyyy-MM-dd HH:mm:ss"))


# Ahoira si lo puedo pasar a un spark pandas DF 
#visitas = visitas.to_pandas_on_spark()

### 4.4) Ventas, cambios físicos y pedidos 

Toda esta información esta en las mismas 2 tablas, es una tabla cabecera (registros generales) y una tabla detalle con los registros a detalle. Estas 2 tablas son:  ZTSDCAB y ZTSDDET.

La clasificación del tipo de registros, es decir si se trato de una venta, una cambio físico, una carga real o un pedido se puede clasificar usando la columna **DOC_TYPE**. El tipo de registros que hay en esta columna son: 

- **ZPSR**: Registros de ventas
- **ZPSO**: Registros de paseo 
- **ZTO**: Cambios fisicos
- **ZPAS** Pedido planta 
- **ZPVE**: Pedido vendedor 



*EXPLICACION DE CLUMNAS DE LA TABLA ZTSDCAB:* 
- **PURCH_NO_C:** Es el ID único de la opreción (cualquier tipo de operación, venta, cambio etc.)  
- **DOC_TYPE:** Establece el tipo de operación 
- **SALES_OFF:** ID de la UNE 
- **SALES_GRP** ID de la ruta que ejecuto esa operación 
- **SALES_OFF:** Es la UNE
- **SALES_GRP:** Es la ruta 
- **PURCH_DATE:** Es la fecha en la que se hizo la operación
- **DIA_CALVE:** Es la fecha en la que se hizo la operación
- **FORMA_PAGO**: Categoria de forma de pago, se dejo con el mismo nombre 
- **TOTAL**: Total monetario

#### 4.4.1) Tabla general de ventas, cambios y pedidos 

In [27]:
# 1) Alta del query 
query_general_ventas_pedidos_y_cambios = f"""
select PURCH_NO_C, DOC_TYPE, SALES_OFF as UNE, SALES_GRP as RUTA, PURCH_DATE, DIA_CLAVE, FORMA_PAGO, TOTAL from SAPABAP1.ZTSDCAB  
WHERE DIA_CLAVE = '{yesterday_date}'
AND  DOC_TYPE IN ('ZPSR', 'ZTO', 'ZPAS', 'ZPVE')
"""
# El formato de la fecha es dia/mes/año 


# 2) Tranformamos formato 
query_general_ventas_y_cambios = f"({query_general_ventas_pedidos_y_cambios}) as query_alias"

# 3) Leer datos en un DataFrame
ventas_pedidos_y_cambios_general = spark.read.jdbc(url=database_url_sap, table=query_general_ventas_y_cambios, properties=database_properties_sap)

# 4) Lo pasamos a un spark pandas DF 
ventas_pedidos_y_cambios_general = ventas_pedidos_y_cambios_general.to_pandas_on_spark()

**AGREGAMOS NUEVAS COLUMNAS más legible**

In [28]:
equivalencia_tipo_movimiento = {"ZPSR": "Ventas", 
                "ZTO": "Cambios_Fisicos", 
                "ZPAS": "Pedidos_Planta", 
                "ZPVE": "Pedido_Vendedor"}

equivalencia_metodo_pago ={
    "3": "Transferencia", 
    "1": "Efectivo", 
    "2": "Cheque",
    "A": "Parcialidades",
    "": "Sin información"
}

In [29]:
ventas_pedidos_y_cambios_general["TIPO_OPERACION"] = ventas_pedidos_y_cambios_general["DOC_TYPE"].map(arg = equivalencia_tipo_movimiento)
ventas_pedidos_y_cambios_general["METODO_PAGO"] = ventas_pedidos_y_cambios_general["FORMA_PAGO"].map(arg = equivalencia_metodo_pago)

#### 4.4.2) Tablas detalle de ventas, pedidos y cambios físicos 


In [30]:
ID_registros = tuple(ventas_pedidos_y_cambios_general["PURCH_NO_C"].unique().to_numpy()) # Metemos los ID a una tupla



In [31]:
# 1) Definimos el query 
query_detalle_ventas_pedidos_y_cambios = f"""
select PURCH_NO_C, MATERIAL, TARGET_QTY as piezas from SAPABAP1.ZTSDDET where PURCH_NO_C in {ID_registros}
"""

# 2) Modificamos el formato 
query_detalle_ventas_pedidos_y_cambios = f"({query_detalle_ventas_pedidos_y_cambios}) as query_alias"

# 3) Ejecucion del query 
ventas_pedidos_y_cambios_detalle = spark.read.jdbc(url=database_url_sap, table=query_detalle_ventas_pedidos_y_cambios, properties=database_properties_sap)

# 4) Lo tranformamos a un spark pandas DF
ventas_pedidos_y_cambios_detalle  = ventas_pedidos_y_cambios_detalle.to_pandas_on_spark()



In [32]:
venta_pedidos_cambios = ventas_pedidos_y_cambios_detalle.merge(right=ventas_pedidos_y_cambios_general, on="PURCH_NO_C")

### 4.5) Carga real de productos 
Carga se refiere a la carga real efecutada en las unidaded (rutas). Es decir los productos que se terminaron subiendo a las unidades. Para esto tenemos que hacer las consultas a las siguientes 2 tablas:
  
- ZINVWMCAB y  ZINVWMDET   


_______

**GLOSARIO:** 

- **PURCH_NO_C:** Es el ID del movimiento, no se le puso ningún alias
-  **DOC_TYPE:** Tipo de documento, no se le dio alias 
- **SALES_OFF:** Centro de distribución se le dió el alias de **UNE** 
- **SALES_GRP:** Es la ruta, se le dio el alias de **RUTA**
- **PURCH_DATE** Fecha del movimiento, no se le dio un alias 


**NOTA MUY IMPORTANTE:** 
- Las fechas vienen en la fecha de CARGA, recordemos que la carga se hace un día de trabajo antes
- En la columna de DOCTYPE estas son los significados de cada catregoria: 
CA = Cargas 
PF = Prefacturas 
RE = Recargas    




**TABLA CARGA REAL GENERAL**

In [33]:
%%time 

# 1) Definición del query 
query_carga_real_general = f"""
select PURCH_NO_C, DOC_TYPE,  SALES_OFF as UNE, SALES_GRP as RUTA, PURCH_DATE
 from SAPABAP1.ZINVWMCAB
where PURCH_DATE = {yesterday_date_second_format}  
 AND SALES_OFF IN {UNEs}
"""

# 2) Modificamos el formato 
query_carga_real_general = f"({query_carga_real_general}) as query_alias"

# 3) Lectura a un Spark DF 
carga_real_general = spark.read.jdbc(url=database_url_sap, table=query_carga_real_general, properties=database_properties_sap)

# 4) Lo convertimos a una Spark pandas DF 
carga_real_general = carga_real_general.to_pandas_on_spark()

CPU times: total: 15.6 ms
Wall time: 577 ms


Agregamos una columna más legible para el tipo de carga 

In [34]:
# 1) Creamos un diccionario 
tipo_carga  ={
    "CA": "Carga", 
    "PF": "Prefacturas", 
    "RE": "Recargas"
}

# 2) Hace el mapping 
carga_real_general["TIPO_OPERACION"] = carga_real_general["DOC_TYPE"].map(arg = tipo_carga)

**TABLA CARGA REAL DETALLE**

In [35]:
IDs_Carga = tuple(carga_real_general["PURCH_NO_C"].unique().to_numpy()) # Metemos los ID a una tupla
IDs_Carga



('20240228212710',
 '20240228212714',
 '20240228212751',
 'PF20240228D002107',
 '20240228211955',
 '20240228211959',
 '20240228212708',
 '20240228123430',
 '20240228212348',
 '20240228212448',
 '20240228212816',
 '20240228212853',
 '20240228212711',
 '20240228212819',
 '20240228212745',
 '20240228212847',
 '20240228212315',
 '20240228212259',
 '20240228212326',
 '20240228211957',
 '20240228212736',
 '20240228212431',
 '20240228212715',
 '20240228212843',
 '20240228212757',
 '20240228212813',
 '20240228212212',
 '20240228212859',
 '20240228212433',
 '20240228212156',
 'PF20240228D001037',
 '20240228212446',
 '20240228212841',
 '20240228212758',
 '20240228212209',
 '20240228211928',
 '20240228212410',
 '20240228212818',
 '20240228212316',
 '20240228212721',
 '20240228212730',
 '20240228212836',
 '20240228212317',
 '20240228212500',
 '20240228211925',
 '20240228212329',
 '20240228212211',
 '20240228212844',
 '20240228212722',
 '20240228212723',
 '20240228211938',
 '20240228212449',
 '2024

In [36]:
# 1) Definición del query 
query_carga_real_detalle = f"""
select PURCH_NO_C, MATERIAL, TARGET_QTY as piezas from SAPABAP1.ZINVWMDET where PURCH_NO_C in {IDs_Carga}
""" 

# 2) Tranformación del formato 
query_carga_real_detalle = f"({query_carga_real_detalle}) as query_alias"

# 3) Extracción de los datos 
carga_real_detalle = spark.read.jdbc(url=database_url_sap, table=query_carga_real_detalle, properties=database_properties_sap)

# 4) Lo pasamos a un spark pandas DF 
carga_real_detalle = carga_real_detalle.to_pandas_on_spark()



**UNION ENTRE CARGA REAL GENERAL Y CARGA REAL DETALLE**

In [37]:
# 1) Pasamos un spark pandas DF 
carga_real = carga_real_detalle.merge(right=carga_real_general, on="PURCH_NO_C")
#carga_real

### 4.6) Volumen de por productos 
Viene cualquier clase  de productos, materias primas, productos terminados etc. 

In [38]:
# 1) Definición del query 
query_productos = """
select MATNR as Material, BRGEW as PesoTotal, NTGEW as PesoLiquido, GEWEI as Unidades from SAPABAP1.MARA where MTART = 'FERT'
"""
# La columna MTART, es el tipo de material, y los productos de venta son FERT de la columna MTAR

# NOTA: En la base de datos ST son piezas! 

# 2) Tranformación del formato query  
query_productos = f"({query_productos}) as query_alias"

# 3) Ejecución del query hacia un spark DF 
vol_productos = spark.read.jdbc(url=database_url_sap, table=query_productos, properties=database_properties_sap)

# 4) Tranformación hacia un spark pandas DF
vol_productos = vol_productos.to_pandas_on_spark()

In [39]:
#vol_productos.sort_values(by=["PESOTOTAL"], ascending=False)

## 5) Tranformación de datos 

Migrar hacia esta sección cualquier manipulacion por más minima que sea , (los merge princiaplenten)

### 5.1) Agregar el volumen a las operaciones de venta y cambios físicos 


In [40]:
# Hacemos el merge de pedidos, ventas y cambios con su respectivo volumen por pieza 
venta_pedidos_cambios = venta_pedidos_cambios.merge(right=vol_productos, on=["MATERIAL"])

### 5.2) Agregamos una columna de meterial resumen 

In [43]:
# Tranformar la columna de material para que  quede  solo los ultimos numeros !
venta_pedidos_cambios["METERIAL_RESUMEN"] = venta_pedidos_cambios["MATERIAL"].astype(int)

Unnamed: 0,PURCH_NO_C,MATERIAL,PIEZAS,DOC_TYPE,UNE,RUTA,PURCH_DATE,DIA_CLAVE,FORMA_PAGO,TOTAL,TIPO_OPERACION,METODO_PAGO,PESOTOTAL,PESOLIQUIDO,UNIDADES,columna,material_2
0,PAD09510220240228121850383,6,3.0,ZPAS,D095,102,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,1.16,1.0,KG,80,6
1,PAD09510220240228121850383,25,3.0,ZPAS,D095,102,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,0.31,0.25,KG,580,25
2,PAD09510220240228121850383,80,3.0,ZPAS,D095,102,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,2.17,1.8,KG,333,80
3,PAD09510220240228121850383,640,4.0,ZPAS,D095,102,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,0.31,0.22,KG,347,640
4,PAD09510220240228121850383,653,2.0,ZPAS,D095,102,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,0.31,0.22,KG,355,653
5,PAD09510220240228121850383,750,12.0,ZPAS,D095,102,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,1.08,0.9,KG,356,750
6,PAD08210620240228890051600,750,12.0,ZPAS,D082,106,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,1.08,0.9,KG,357,750
7,PAD08210620240228890051600,656,4.0,ZPAS,D082,106,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,0.31,0.22,KG,412,656
8,PAD08210620240228890051600,653,4.0,ZPAS,D082,106,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,0.31,0.22,KG,609,653
9,PAD08210620240228890051600,640,4.0,ZPAS,D082,106,20240228,28/02/2024,,0.0,Pedidos_Planta,Sin información,0.31,0.22,KG,612,640


## 6) Carga de datos hacia GCP 

### 6.1) Parámetros 

In [44]:
print("Nombre del Bucket en Google Cloud Storage:", bucket)
print("Nombre del proyecto en GCP:", proyecto)
print("Conjunto de datos (base de datos) en Big Query:", conjunto_datos_SAP_BQ)
print("Método de inserción de datos en Big Query:", modo_escritura)
# WRITE_APPEND va a agregar datos a los ya existentes, WRITE_TRUNCATE reemplaza los datos existentes por los nuevos en BQ 


Nombre del Bucket en Google Cloud Storage: gs://etl_apache_spark
Nombre del proyecto en GCP: ciencia-de-datos-398421
Conjunto de datos (base de datos) en Big Query: SAP
Método de inserción de datos en Big Query: WRITE_TRUNCATE
