In [16]:
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os
load_dotenv()

spark = SparkSession.builder \
    .appName("SessionCentexETL") \
    .master("local[4]") \
    .config("spark.sql.shuffle.partitions", 4) \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars", "C:\\jars\\mssql-jdbc-12.10.0.jre11.jar") \
    .getOrCreate()

jdbc_url = f"jdbc:sqlserver://{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')};databaseName={os.getenv('DB_NAME')};encrypt=true;trustServerCertificate=true"
properties = {
    "user": os.getenv('DB_USER'),
    "password": os.getenv('DB_PASSWORD'),
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [None]:
#Total de proveedores
query_total_prov = "(select dc.ID_Agenda, ag.AgendaNombre from Doc_Compra dc inner join Agenda ag on ag.ID_Agenda = dc.ID_Agenda group by ag.AgendaNombre, dc.ID_Agenda) AS query_totales"
df_total_prov = spark.read.jdbc(url=jdbc_url, table=query_total_prov, properties=properties)
print(f"Total proveedores: {df_total_prov.count()}")
df_total_prov.show(df_total_prov.count(), truncate=False)

Total proveedores: 443
+------------------+--------------------------------------------------------------------------------------------------+
|ID_Agenda         |AgendaNombre                                                                                      |
+------------------+--------------------------------------------------------------------------------------------------+
|20100210909       |LA POSITIVA SEGUROS Y REASEGUROS S.A.                                                             |
|20101128777       |DHL EXPRESS PERU SAC                                                                              |
|20101158927       |TRANSBER S.A.C.                                                                                   |
|20101266819       |CAMARA DE COMERCIO DE LIMA                                                                        |
|20101666167       |AVICOLA DON BRUNO SRL                                                                             |
|20106076635     

In [33]:
#Proveedores con dos registros
from pyspark.sql import functions as F
df_two_register = df_total_prov.groupBy("AgendaNombre").count().filter(F.col("count") > 1).select("AgendaNombre")
print(f"Proveedores con dos registros: {df_two_register.count()}")

df_two_select = df_total_prov.join(df_two_register, on="AgendaNombre", how="inner")

df_two_select.show(df_two_select.count(), truncate=False)

Proveedores con dos registros: 4
+-------------------------------+-----------+
|AgendaNombre                   |ID_Agenda  |
+-------------------------------+-----------+
|CHAMBILLA PEREZ, YHONATAN      |10744012410|
|CHAMBILLA PEREZ, YHONATAN      |74401241   |
|AZERRAD TENSERA, WILLIAM MOISES|10004191604|
|AZERRAD TENSERA, WILLIAM MOISES|00419160   |
|ARIMUYA MORALES, JULIO         |10053435861|
|ARIMUYA MORALES, JULIO         |05343586   |
|HUAMANI CALSIN, JESSELA        |40438092   |
|HUAMANI CALSIN, JESSELA        |10404380929|
+-------------------------------+-----------+



In [None]:
#Proceso de limnpieza para quedarme con solo RUC

df_ruc_pj = df_two_select.filter(
    (F.length("ID_Agenda") == 11)
)

print(df_ruc_pj.columns)

df_ruc_pj.show(df_ruc_pj.count(), truncate=False)

df_filter_repeat = df_total_prov.join(df_ruc_pj.select("AgendaNombre"), on="AgendaNombre", how="left_anti")
print(df_filter_repeat.columns)

df_union_no_repeat_provider = df_filter_repeat.unionByName(df_ruc_pj)
print(f"Total proveedores unidos sin repetir solo con ruc: {df_union_no_repeat_provider.count()}")
print(df_union_no_repeat_provider.columns)
#Tabla limpia sin repetir los proveedores con DNI y RUC se validaron ahora solo por su RUC
df_union_no_repeat_provider.show(df_union_no_repeat_provider.count(), truncate=False)


['AgendaNombre', 'ID_Agenda']
+-------------------------------+-----------+
|AgendaNombre                   |ID_Agenda  |
+-------------------------------+-----------+
|CHAMBILLA PEREZ, YHONATAN      |10744012410|
|AZERRAD TENSERA, WILLIAM MOISES|10004191604|
|ARIMUYA MORALES, JULIO         |10053435861|
|HUAMANI CALSIN, JESSELA        |10404380929|
+-------------------------------+-----------+

['AgendaNombre', 'ID_Agenda']
Total proveedores unidos sin repetir solo con ruc: 439
['AgendaNombre', 'ID_Agenda']
+--------------------------------------------------------------------------------------------------+------------------+
|AgendaNombre                                                                                      |ID_Agenda         |
+--------------------------------------------------------------------------------------------------+------------------+
|6 FUSION SOCIEDAD ANONIMA CERRADA                                                                 |20610042644       |
|A W F

In [32]:
from pyspark.sql import functions as F

query_cl = "(select ag.AgendaNombre, dc.ID_Agenda from Doc_Compra dc inner join Agenda ag on ag.ID_Agenda = dc.ID_Agenda where len(dc.ID_Agenda) = 11 or len(dc.ID_Agenda) = 8 group by ag.AgendaNombre, dc.ID_Agenda) AS query_clean"
df_agenda_prueba = spark.read.jdbc(url=jdbc_url, table=query_cl, properties=properties)
df_validos = df_agenda_prueba.filter(
    (F.length("ID_Agenda").isin([8, 11])) &
    (F.col("ID_Agenda").rlike("^[0-9]+$"))
)

df_no_validos = df_agenda_prueba.filter(
    ~((F.length("ID_Agenda").isin([8, 11])) & 
      (F.col("ID_Agenda").rlike("^[0-9]+$")))
)

df_duplicados = df_validos.groupBy("AgendaNombre").count().filter(F.col("count") > 1)

df_duplicados.show(truncate=False)

print("Total registros:", df_agenda_prueba.count())
print("Registros válidos:", df_validos.count())
print("Registros no válidos:", df_no_validos.count())

#df_validos.show(truncate=False)


+-------------------------------+-----+
|AgendaNombre                   |count|
+-------------------------------+-----+
|CHAMBILLA PEREZ, YHONATAN      |2    |
|AZERRAD TENSERA, WILLIAM MOISES|2    |
|ARIMUYA MORALES, JULIO         |2    |
|HUAMANI CALSIN, JESSELA        |2    |
+-------------------------------+-----+

Total registros: 424
Registros válidos: 420
Registros no válidos: 4


In [13]:
import pandas as pd
df_pd_val = pd.DataFrame(df_validos.toPandas())
df_pd_noval = pd.DataFrame(df_no_validos.toPandas())
df_pd_val.to_csv("DevueltosCSV/validos.csv", index=False, encoding='utf-8-sig', header=None) 
df_pd_noval.to_csv("DevueltosCSV/novalidos.csv", index=False, encoding='utf-8-sig', header=None)

In [8]:
df_no_validos.show(truncate=False)

+-------------------------------+-----------+
|AgendaNombre                   |ID_Agenda  |
+-------------------------------+-----------+
|BRUKER NANO GMBH               |DE200443246|
|BRUKER AXS SE                  |DE812037551|
|PEAK SCIENTIFIC INSTRUMENTS LTD|GB699501784|
|BRUKER DALTONIK GMBH           |DE114404287|
+-------------------------------+-----------+

