In [1213]:
# Import librarys python
import pandas as pd
import glob
from datetime import datetime
import numpy as np
from pyspark import StorageLevel
from pyspark.sql.types import StructField, DecimalType, StringType, IntegerType, DoubleType, FloatType, DateType, TimestampType
from pyspark.sql import functions as sf, Window as w
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [1214]:
spark           = SparkSession.builder.master("local[*]").appName("customerPortfolio").getOrCreate()
input_directory = '/home/pydev/workflow/dt_customer_portfolio/data/customerPortfolio.csv'

df_Data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("sep", ",").load(input_directory)
df_Data = df_Data.withColumn('Key_Fact', sf.concat(sf.col('cod_cia'), sf.lit('-'),sf.col('cod_co'), sf.lit('-'), sf.col('cod_doc'), sf.col('consec_doc')))

# w = w.orderBy(sf.desc('Key_Fact'))
# df_Data = df_Data.withColumn('eq', sf.col('Key_Fact') == sf.lag('Key_Fact').over(w))

# df_Data.withColumn("lag",lag("Key_Fact",2).over(w)).show()
# df_Data.show(10)
# 2-1-FV2419738
windowSpec = w.partitionBy("Key_Fact").orderBy("Key_Fact")
# df_Data = df_Data.withColumn("lag",lag("Key_Fact").over(windowSpec))

In [1215]:
df_Data = df_Data.filter(df_Data.Key_Fact == '2-1-FV2419738')
# df_Data = df_Data.select(sf.col("Key_Fact"), sf.col("vlr_fact"), sf.col("debitos_sa"), sf.col("creditos_sa"), sf.col("saldo") )
df_Data = df_Data.withColumn("row_number_db",row_number().over(windowSpec))
df_Data = df_Data.withColumn('debitos_real',  when(df_Data.row_number_db == 1, df_Data.debitos_sa).otherwise(0))
df_Data = df_Data.withColumn('creditos_real',  when(df_Data.row_number_db == 1, df_Data.creditos_sa).otherwise(0))
df_Data = df_Data.withColumn('saldos_real',  when(df_Data.row_number_db == 1, df_Data.saldo).otherwise(0))
# df_Data = df_Data.withColumn('debitos_real_dist', df_Data.debitos_real/max(df_Data.row_number_db).over(windowSpec)).orderBy('Key_Fact')
# df_Data = df_Data.drop("debitos_sa", "creditos_sa", "saldo")

df_Data_db = df_Data.select(sf.col("Key_Fact"), sf.col("row_number_db"), sf.col("debitos_real"))
df_Data_db = df_Data_db.withColumn('debitos_real_dist', df_Data.debitos_real/max(df_Data.row_number_db).over(windowSpec)).orderBy('Key_Fact')
df_Data_db = df_Data_db.drop("row_number_db", "debitos_real")
df_Data_db = df_Data_db.filter(df_Data_db.debitos_real_dist !=0)
df_Data_db.show()

+-------------+-----------------+
|     Key_Fact|debitos_real_dist|
+-------------+-----------------+
|2-1-FV2419738|          41456.5|
+-------------+-----------------+



In [1216]:
df_Data.show()

+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+-------------+------------+-------------+-----------+
|rowid_doc|cod_cia|          fecha_doc|rowid_tercero|     nit|        razon_social|cod_sucursal|cod_co|cod_co_mvto|cod_doc|consec_doc|  linea_preveedor|       grupo_shopper|ind_naturaleza|debitos_sa|creditos_sa|  saldo|vlr_fact|     Key_Fact|row_number_db|debitos_real|creditos_real|saldos_real|
+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+-------------+------------+-------------+-----------+
| 27140900|      2|2023-02-10 00:00:00|        43859|71704307|FLOREZ ARENAS HER...|         001|     1|         

In [1217]:
df_Data_Clean   = df_Data.join(df_Data_db, df_Data.Key_Fact == df_Data_db.Key_Fact).drop(df_Data.Key_Fact)
df_Data_Clean.show()

+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+------------+-------------+-----------+-------------+-----------------+
|rowid_doc|cod_cia|          fecha_doc|rowid_tercero|     nit|        razon_social|cod_sucursal|cod_co|cod_co_mvto|cod_doc|consec_doc|  linea_preveedor|       grupo_shopper|ind_naturaleza|debitos_sa|creditos_sa|  saldo|vlr_fact|row_number_db|debitos_real|creditos_real|saldos_real|     Key_Fact|debitos_real_dist|
+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+------------+-------------+-----------+-------------+-----------------+
| 27140900|      2|2023-02-10 00:00:00|        43859|71704

In [1218]:
df_Data.createOrReplaceTempView("DATA")
facts = spark.sql("SELECT SUM(vlr_fact)/1000000, SUM(debitos_real)/1000000, SUM(creditos_real)/1000000, SUM(saldos_real)/1000000 from DATA").show(truncate=False)

+-------------------------+-----------------------------+------------------------------+----------------------------+
|(sum(vlr_fact) / 1000000)|(sum(debitos_real) / 1000000)|(sum(creditos_real) / 1000000)|(sum(saldos_real) / 1000000)|
+-------------------------+-----------------------------+------------------------------+----------------------------+
|0.165826                 |0.165826                     |0.079149                      |0.086677                    |
+-------------------------+-----------------------------+------------------------------+----------------------------+



In [1219]:
print(type(df_Data))

<class 'pyspark.sql.dataframe.DataFrame'>


In [1220]:
# df_Data = df_Data.select(sf.col("Key_Fact"), sf.col("vlr_fact"), sf.col("debitos_sa"), sf.col("creditos_sa"), sf.col("saldo") )
# df_Data = df_Data.filter(df_Data.Key_Fact == '2-1-FV2419738')
# windowSpec_db = w.partitionBy("Key_Fact").orderBy("debitos_sa")
# windowSpec_cr = w.partitionBy("Key_Fact").orderBy("creditos_sa")
# windowSpec_sa = w.partitionBy("Key_Fact").orderBy("saldo")
# df_Data_temp = df_Data_temp.withColumn("lag",lead("debitos_sa", offset=-1, default=0).over(windowSpec))
df_Data = df_Data.withColumn("row_number_db",row_number().over(windowSpec))
# df_Data_temp = df_Data_temp.withColumn("row_number_cr",row_number().over(windowSpec_cr))
# df_Data_temp = df_Data_temp.withColumn("row_number_sa",row_number().over(windowSpec_sa))
# df_Data = df_Data.withColumn('debitos_real',  when(df_Data.row_number_db == 1, df_Data.debitos_sa)
#                                                         .otherwise(0))
# df_Data = df_Data.withColumn('debitos_real_dist', df_Data.debitos_real/max(df_Data.row_number_db).over(windowSpec)).orderBy('Key_Fact')

# df_Data = df_Data.withColumn("id", sf.last('debitos_real_dist', True).over(w.partitionBy('Key_Fact').orderBy('debitos_sa').rowsBetween(-sys.maxsize, 0)))
# df_Data = df_Data.drop("debitos_sa", "creditos_sa", "saldo")
# df_Data = df_Data.withColumn('debitos_realsa',  when(df_Data.debitos_sa >= 1, df_Data.debitos_real_dist)
#                                                         .otherwise(0))

df_Data.show(50)

+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+-------------+------------+-------------+-----------+
|rowid_doc|cod_cia|          fecha_doc|rowid_tercero|     nit|        razon_social|cod_sucursal|cod_co|cod_co_mvto|cod_doc|consec_doc|  linea_preveedor|       grupo_shopper|ind_naturaleza|debitos_sa|creditos_sa|  saldo|vlr_fact|     Key_Fact|row_number_db|debitos_real|creditos_real|saldos_real|
+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+-------------+------------+-------------+-----------+
| 27140900|      2|2023-02-10 00:00:00|        43859|71704307|FLOREZ ARENAS HER...|         001|     1|         

In [1221]:
# Sampling Datam with sql

# df_Data.createOrReplaceTempView("DATA")
# temp = spark.sql("SELECT * FROM DATA d WHERE d.Key_Fact = '2-1-FV2419738' ").show(truncate=False)
# facts = spark.sql("SELECT SUM(vlr_fact) from DATA").show(truncate=False)

In [1222]:
# print(type(temp))

In [1223]:
print((df_Data.count(), len(df_Data.columns)))

(4, 23)


In [1224]:
df_Data.select(sum(df_Data.vlr_fact)).show()

+-------------+
|sum(vlr_fact)|
+-------------+
|       165826|
+-------------+



In [1225]:
# Select the columns and remove the duplicates
# dropDisDF = df_Data.select(sf.col("debitos_sa").alias("debitos_sa_2"),
#                            sf.col("creditos_sa").alias("creditos_sa_2"),sf.col("saldo").alias("saldo_2"), sf.col("Key_Fact"))
# dropDisDF = dropDisDF.distinct()
# Join Data

# df_Data_Clean   = df_Data.join(dropDisDF, df_Data.Key_Fact == dropDisDF.Key_Fact, how="inner").drop(df_Data.Key_Fact)
# print((dropDisDF.count(), len(dropDisDF.columns)))
# dropDisDF = dropDisDF.distinct()

In [1226]:
# dropDisDF.select(sum(dropDisDF.vlr_fact_2)).show()
dropDisDF.select(sum(dropDisDF.debitos_sa_2)).show()

+--------------------+
|   sum(debitos_sa_2)|
+--------------------+
|1.366554661257399...|
+--------------------+



In [1227]:
dropDisDF.show()

+------------+-------------+------------+--------------+
|debitos_sa_2|creditos_sa_2|     saldo_2|      Key_Fact|
+------------+-------------+------------+--------------+
|    273634.0|          0.0|    273634.0| 2-3-FV2163855|
|   1108268.0|    1098061.0|     10207.0| 2-3-FV2235835|
|    515704.0|     509097.0|      6607.0| 2-3-FV2239984|
|    824433.0|     804045.0|     20388.0|  2-9-FV272656|
|    756496.0|     754126.0|      2370.0|  2-9-FV278686|
|    756496.0|     754126.0|      2370.0| 2-14-FV233557|
|   1217720.0|    1205041.0|     12679.0| 2-14-FV233977|
|         0.0|      57800.0|    -57800.0|   2-276-NV212|
|   9413304.0|    9379584.0|     33720.0| 2-3-FV2315538|
|  1.660255E7|  1.5814754E7|    787796.0|2-10-FV2106647|
| 1.2157872E7|  1.2155136E7|      2736.0|  2-5-FV289711|
|   7676098.0|     160742.0|   7515356.0| 2-14-FV241592|
|   1407578.0|          0.0|   1407578.0| 2-1-FV2397124|
|    886796.0|     855493.0|     31303.0|  2-4-FV274217|
|   8960725.0|          0.0|   

In [1228]:
print((dropDisDF.count(), len(dropDisDF.columns)))

(41470, 4)


In [1229]:
# print((dropDisDF.count(), len(dropDisDF.columns)))
# print((df_Data_Clean.count(), len(df_Data_Clean.columns)))

In [1230]:
# df_Data_Clean.show(50)

In [1231]:
# df_Data.printSchema()

In [1232]:
df_Data.show()

+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+-------------+------------+-------------+-----------+
|rowid_doc|cod_cia|          fecha_doc|rowid_tercero|     nit|        razon_social|cod_sucursal|cod_co|cod_co_mvto|cod_doc|consec_doc|  linea_preveedor|       grupo_shopper|ind_naturaleza|debitos_sa|creditos_sa|  saldo|vlr_fact|     Key_Fact|row_number_db|debitos_real|creditos_real|saldos_real|
+---------+-------+-------------------+-------------+--------+--------------------+------------+------+-----------+-------+----------+-----------------+--------------------+--------------+----------+-----------+-------+--------+-------------+-------------+------------+-------------+-----------+
| 27140900|      2|2023-02-10 00:00:00|        43859|71704307|FLOREZ ARENAS HER...|         001|     1|         