In [22]:
import csv
from collections import defaultdict
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, when

Lendo os arquivos csv como dataframes, troca do nome da coluna ID na tabela GCP para evitar ambiguidade.

Verificando a quantidade de linhas da tabela e suas diferenças:

In [19]:
spark = SparkSession.builder.appName("Query").getOrCreate()

file_name_local = 'application_record_local'
df_local = spark.read.csv(file_name_local + ".csv", header=True, sep=";", encoding="latin1")

file_name_gcp = 'application_record_gcp'
df_gcp = spark.read.csv(file_name_gcp + ".csv", header=True, sep=",", encoding="latin1")
df_gcp = df_gcp.withColumnRenamed("ID", "gcp_ID")

# contando o número de registros
df_local_count = df_local.count()
df_gcp_count = df_gcp.count()

# diferenças entre os data frames
diferenca = df_local_count - df_gcp_count

resultado = {
    'TB_LOCAL': df_local_count,
    'TB_GCP': df_gcp_count,
    'DIFERENCA': diferenca
}

for chave, valor in resultado.items():
    print('{}: {}'.format(chave, valor))

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|         1

Verificando a quantidade de IDs distintos da tabela e suas diferenças:

In [6]:
df_local_distinct_count = df_local.select("ID").distinct().count()
df_gcp_distinct_count = df_gcp.select("gcp_ID").distinct().count()
diferenca_id_distintos = df_local_distinct_count - df_gcp_distinct_count

resultado = {
    'TB_LOCAL_DISTINCT': df_local_distinct_count,
    'TB_GCP_DISTINCT': df_gcp_distinct_count,
    'DIFERENCA_ID_DISTINTOS': diferenca_id_distintos
}

for chave, valor in resultado.items():
    print('{}: {}'.format(chave, valor))

TB_GCP_DISTINCT: 434459
TB_LOCAL_DISTINCT: 438510
DIFERENCA_ID_DISTINTOS: 4051


Lista de IDs que tem na tabela local e não na GCP:

In [20]:
df_result = df_local.join(df_gcp, df_local["ID"] == df_gcp["gcp_ID"], how='left')

# filtro onde o ID da tabela gcp for nulo
df_result = df_result.filter(F.col("gcp_ID").isNull())

df_result.show()

+-------+-----------+------------+---------------+------------+----------------+--------------------+-------------------+--------------------+-------------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|  NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|gcp_ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|F

Lista de IDs que tem na tabela GCP e não na local:

In [8]:
df_result = df_gcp.join(df_local, df_gcp["gcp_ID"] == df_local["ID"], how='left')

# filtro onde o ID da tabela local for nulo
df_result = df_result.filter(F.col("ID").isNull())

df_result.show()

+------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|gcp_ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS| ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCC

Contagem de IDs duplicados no GCP e ordenação decrescente:

In [9]:
# agregação dos IDs que aparecem mais de uma vez
df_grouped = df_gcp.groupBy("gcp_ID").agg(count("gcp_ID").alias("ID_Count"))
duplicated_ids = df_grouped.filter(col("ID_Count") > 1)
duplicate_count = duplicated_ids.count()
print("Número de IDs com mais de uma ocorrência: {}".format(duplicate_count))

df_ordered = duplicated_ids.orderBy(col("ID_Count").desc())
df_ordered.show()

Número de IDs com mais de uma ocorrência: 3000
+-------+--------+
| gcp_ID|ID_Count|
+-------+--------+
|5008957|       2|
|5009141|       2|
|5009198|       2|
|5009628|       2|
|5010058|       2|
|5010338|       2|
|5010568|       2|
|5010623|       2|
|5010674|       2|
|5010781|       2|
|5010801|       2|
|5010820|       2|
|5018477|       2|
|5021662|       2|
|5021682|       2|
|5021738|       2|
|5021818|       2|
|5021849|       2|
|5021878|       2|
|5021947|       2|
+-------+--------+
only showing top 20 rows



Análise de padrão das diferenças encontradas:

In [17]:
df_joined = df_local.join(df_gcp, df_local["ID"] == df_gcp["gcp_ID"], how='full_outer')

df_divergencias = df_joined.filter((F.col("ID").isNull()) | (F.col("gcp_ID").isNull()))

df_divergencias.show()

+-------+-----------+------------+---------------+------------+----------------+--------------------+-------------------+--------------------+-------------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|  NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|gcp_ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|F

Filtro usado para encontrar quais colunas apresentavam diferenças:

In [28]:
# retira os registros duplicados
df_gcp = df_gcp.dropDuplicates()

#join com condições
joined = df_local.join(df_gcp, (
    (df_local["ID"] == df_gcp["gcp_ID"]) &
    (df_local.FLAG_OWN_CAR == df_gcp.FLAG_OWN_CAR) &
    (df_local.FLAG_OWN_REALTY == df_gcp.FLAG_OWN_REALTY) &
    (df_local.CNT_CHILDREN == df_gcp.CNT_CHILDREN) &
    (df_local.NAME_INCOME_TYPE == df_gcp.NAME_INCOME_TYPE) &
    (df_local.NAME_EDUCATION_TYPE == df_gcp.NAME_EDUCATION_TYPE) &
    (df_local.NAME_FAMILY_STATUS == df_gcp.NAME_FAMILY_STATUS) &
    (df_local.NAME_HOUSING_TYPE == df_gcp.NAME_HOUSING_TYPE) &
    (df_local.DAYS_EMPLOYED == df_gcp.DAYS_EMPLOYED) &
    (df_local.FLAG_MOBIL == df_gcp.FLAG_MOBIL) &
    (df_local.FLAG_PHONE == df_gcp.FLAG_PHONE) &
    (df_local.FLAG_EMAIL == df_gcp.FLAG_EMAIL) &
    (df_local.CNT_FAM_MEMBERS == df_gcp.CNT_FAM_MEMBERS)
), "left")

filtered = joined.filter(col("gcp_ID").isNotNull())

#selecionar somente as colunas com divergência
df_selected = filtered.select(
    df_local["ID"],
    df_local["CODE_GENDER"].alias("tb_local_CODE_GENDER"),
    df_gcp["CODE_GENDER"].alias("tb_gcp_CODE_GENDER"),
    df_local["AMT_INCOME_TOTAL"].alias("tb_local_AMT_INCOME_TOTAL"),
    df_gcp["AMT_INCOME_TOTAL"].alias("tb_gcp_AMT_INCOME_TOTAL"),
    df_local["DAYS_BIRTH"].alias("tb_local_DAYS_BIRTH"),
    df_gcp["DAYS_BIRTH"].alias("tb_gcp_DAYS_BIRTH"),
    df_local["FLAG_WORK_PHONE"].alias("tb_local_FLAG_WORK_PHONE"),
    df_gcp["FLAG_WORK_PHONE"].alias("tb_gcp_FLAG_WORK_PHONE"),
    df_local["OCCUPATION_TYPE"].alias("tb_local_OCCUPATION_TYPE"),
    df_gcp["OCCUPATION_TYPE"].alias("tb_gcp_OCCUPATION_TYPE")
)

df_selected.show()

+-------+--------------------+------------------+-------------------------+-----------------------+-------------------+-----------------+------------------------+----------------------+------------------------+----------------------+
|     ID|tb_local_CODE_GENDER|tb_gcp_CODE_GENDER|tb_local_AMT_INCOME_TOTAL|tb_gcp_AMT_INCOME_TOTAL|tb_local_DAYS_BIRTH|tb_gcp_DAYS_BIRTH|tb_local_FLAG_WORK_PHONE|tb_gcp_FLAG_WORK_PHONE|tb_local_OCCUPATION_TYPE|tb_gcp_OCCUPATION_TYPE|
+-------+--------------------+------------------+-------------------------+-----------------------+-------------------+-----------------+------------------------+----------------------+------------------------+----------------------+
|5008895|                   F|            Female|                 297000.0|             29700000.0|             -15519|            15519|                       0|                  null|                Laborers|              Laborers|
|5009246|                   F|            Female|               

Transformação das tabelas do GCP divergentes da tabela local:

In [23]:
df_transformed = df_gcp.select(
    col("gcp_ID").alias("ID"),
    when(col("CODE_GENDER") == "Male", "M")
    .when(col("CODE_GENDER") == "Female", "F")
    .otherwise(None).alias("CODE_GENDER"),
    col("FLAG_OWN_CAR"),
    col("FLAG_OWN_REALTY"),
    col("CNT_CHILDREN"),
    (col("AMT_INCOME_TOTAL") / 100).alias("AMT_INCOME_TOTAL"),
    col("NAME_INCOME_TYPE"),
    col("NAME_EDUCATION_TYPE"),
    col("NAME_FAMILY_STATUS"),
    col("NAME_HOUSING_TYPE"),
    (col("DAYS_BIRTH") * -1).alias("DAYS_BIRTH"),
    col("DAYS_EMPLOYED"),
    col("FLAG_MOBIL"),
    col("FLAG_WORK_PHONE"),
    col("FLAG_PHONE"),
    col("FLAG_EMAIL"),
    when(col("OCCUPATION_TYPE") == "Without Occupation", "")
    .otherwise(col("OCCUPATION_TYPE")).alias("OCCUPATION_TYPE"),
    col("CNT_FAM_MEMBERS")
).distinct()

df_transformed.show()


+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|6153775|          F|           N|              Y|           0|        297000.0|Commercial associate|Secondary / secon...|Single / not married| Rented apartment|  -15519.0|        -3234|         1