# Общее описание решения
 
Для сравнения двух источников нужно данные из них куда-то загрузить и потом сравнить строки одной и другой таблицы. Проще всего это сделать с помощью SQL, так эта операция соответствует FULL JOIN двух таблиц с условием по всем полям сравнения. При большем кол-ве полей сравнения такой джойн может быть "тяжелым" для БД и можно его упростить. Склеиваем все поля сравнения в виде строк и считаем от этого поля hash. Join делаем по этому полю - БД делает или hash или merge join (в зависимости от размера таблиц) и он максимально эффективен.  Теоретически при таком подходе возможна коллизия, но она мало вероятна.
 
 
**Требование 1: как минимум один - таблица БД, второй на выбор**
 
*  Берем базу PG
*  Берем csv файл
 
 
**Требование 2: должен присутствовать уникальный ключ - uid/id на выбор**
 
У меня в данных есть и id и uid, но уникальным будет только id.
 
**Требование 3: Реконсиляция должна быть масштабируема и применима к Big Data**

Для этого буду использовать Apache Spark. Умеет читать данные как из файлов так и из большинства БД. Дальше все данные будут сравниваться в ОЗУ с помощью Spark SQL.При увеличении объема данных для сравнения такое решение будет линейно масштабируемая - добавляем новые воркеры для Spark и продолжаем работу. Если будет необходимо надежное промежуточное хранение результатов - можно использовать любую реализацию s3 или hdfs (которые сами по себе позволяют хранить большие данные надежно в распределенной системе с резервированием).
 
**Дополнительное требование 1: для разных типов данных (дата, текст, числа)**
 
Мы все поля склеиваем и считаем hash. Для этого они сначала преобразуются к строке и различие в типе данных перестает иметь значение.
 
**Дополнительное требование 2: для числовых данных должна быть возможность сконфигурировать толеранс (допустимую погрешность в %)**
 
Для сравнения числовых полей с погрешностью - не включаем их в условия джойна, а считаем процентную разницу между таблицами и сравниваем с порогом. В рамках этой задачи я использую один порог для всех таких полей, но можно реализовать и отдельно для каждого поля.



# Загрузка и установка Spark

In [1]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# download spark3.1.1
!wget -q https://www.mirrorservice.org/sites/ftp.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
# install findspark and pyspark
!pip install -q findspark
!pip install pyspark



# Запуск и настройка Spark

In [29]:
# imports and spark coniguration
import os
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-openjdk-amd64' 
os.environ["SPARK_HOME"] = 'spark-3.1.1-bin-hadoop3.2'

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.2.19 pyspark-shell' # драйвер для работы с PG

import findspark
findspark.init()

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import SQLContext

from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType



spark = SparkSession.builder.master("local[*]").getOrCreate()
sqlContext = SQLContext(spark)

# Определим общие настройки

In [189]:
t1_name = 'datasource1'
t1_schema_name = 'reconciliation'

t2_name = 'datasource2'

result_table_name = 'diff_table'

# настройка полей стравнения
params = {
    'hash_fields': ['StockCode', 'Description', 'Quantity', 'InvoiceDate', 'uid', 'Country'], # поля, от которых будем считать hash
    'numeric_fields': ['UnitPrice'], # поля, по которым будем считать допустимую погрешность. Должны не пересекаться с hash_fields
    'pr': 0.1 # значение допустимой погрешности

}

# Загрузка данных из CSV и PG в Spark

In [188]:
# Загрузим данные из PG и сохраним во временную таблицу Spark
def create_table_from_pg(name, schema_name, sc=sqlContext):
  
  df = sc.read\
      .format("jdbc")\
      .option("url", "jdbc:postgresql://34.71.247.247:5432/home_test")\
      .option("dbtable", f"{schema_name}.{name}")\
      .option("user", "postgres")\
      .option("password", "12345678")\
      .option("driver", "org.postgresql.Driver")\
      .load()
  
  df.createOrReplaceTempView(name)

  print(df.printSchema())

  print (sc.sql(f'SELECT * FROM {name}').show(5))
   
create_table_from_pg(t1_name, t1_schema_name)

root
 |-- id: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- uid: integer (nullable = true)
 |-- Country: string (nullable = true)

None
+------+---------+--------------------+--------+------------------+---------+-----+--------------+
|    id|StockCode|         Description|Quantity|       InvoiceDate|UnitPrice|  uid|       Country|
+------+---------+--------------------+--------+------------------+---------+-----+--------------+
|536366|   84406B|CREAM CUPID HEART...|       8|01.12.2010 8:26:00|     2.75|17850|United Kingdom|
|536367|   84029G|KNITTED UNION FLA...|       6|01.12.2010 8:26:00|     3.39|17851|United Kingdom|
|536371|    22544|MINI JIGSAW SPACEBOY|      24|01.12.2010 8:45:00|     0.42|12583|        France|
|536372|    22492|MINI PAINT SET VI...|      36|01.12.2010 8:45:00|     0.65

In [170]:
# загрузим csv локально для загрузки в Spark
!wget -q 'https://raw.githubusercontent.com/eugeneks/home_test_202103/main/datasource2.csv'

!ls

datasource2	   datasource2.csv.2  spark-3.1.1-bin-hadoop3.2
datasource2.csv    datasource2.csv.3  spark-3.1.1-bin-hadoop3.2.tgz
datasource2.csv.1  sample_data	      spark-3.1.1-bin-hadoop3.2.tgz.1


In [190]:
# Загрузим данные из CSV и сохраним во временную таблицу Spark
def create_table_from_csv(name, schema, sc=sqlContext):

  df = sc.read.format("csv")\
      .option("header", "true")\
      .option("sep", ',') \
      .schema(schema) \
      .load(f'./{name}.csv')

  df.createOrReplaceTempView(name)

  print(df.printSchema())

  print (sc.sql(f'SELECT * FROM {name}').show(5))

datasource2_schema = sqlContext.sql(f'SELECT * FROM {t1_name}').schema # берем схему таблицы из PG - чтоб не было проблем с соотвествием типов данных

   
create_table_from_csv(t2_name, datasource2_schema )

root
 |-- id: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- uid: integer (nullable = true)
 |-- Country: string (nullable = true)

None
+------+---------+--------------------+--------+------------------+---------+-----+--------------+
|    id|StockCode|         Description|Quantity|       InvoiceDate|UnitPrice|  uid|       Country|
+------+---------+--------------------+--------+------------------+---------+-----+--------------+
|536365|    71053| WHITE METAL LANTERN|       6|01.12.2010 8:26:00|     3.39|17850|        France|
|536366|   84406B|CREAM CUPID HEART...|       8|01.12.2010 8:26:00|      2.9|17850|United Kingdom|
|536367|   84029G|KNITTED UNION FLA...|       6|01.12.2010 8:26:00|     3.39|17851|United Kingdom|
|536371|    22544|MINI JIGSAW SPACEBOY|      24|01.12.2010 8:45:00|     0.42

#  Реализуем логику сравнения

In [191]:
# ищем разницу между таблицами
def find_diff(name, sc=sqlContext, num_rows=100):

  # подстановка для полей, по которым будем считать hash для join в SQL
  join_id = ', '.join(params['hash_fields'])


  diff_fields = ''
  field_pos =0

  # подстановка для полей, по которым будем считать погрешность в SQL
  for diff_field in params['numeric_fields']:
    field_pos = field_pos + 1
    diff_fields = diff_fields + f'\n , COALESCE(abs(t1.{diff_field}-t2.{diff_field})/ greatest(t1.{diff_field}, t2.{diff_field}), 0) AS diff_{field_pos}'
  
  sql = f"""
  with t1 AS (
    SELECT *
        , ABS(hash(CONCAT ({join_id}))) as join_id
        , '{t1_name}'           AS datasource
    FROM 
      {t1_name}
  )
  , t2 AS(
    SELECT *
      , ABS(hash(CONCAT ({join_id}))) as join_id
      , '{t2_name}' AS datasource
    FROM 
      {t2_name}

  )
  SELECT
    t1.id AS t1_id
    , t1.datasource
    ,  t1.join_id AS t1_join_id
    , t2.id AS t2_id
    , t2.datasource
    , t2.join_id AS t2_join_id
    {diff_fields}
  FROM
    t1
  FULL JOIN
    t2
  ON
    t1.id =t2.id
    AND t1.join_id = t2.join_id
    AND t1.datasource != t2.datasource
  ORDER BY 
    COALESCE(t1.id, t2.id), COALESCE(t1.datasource, t2.datasource)
  """

  df =sc.sql(sql)

  df.createOrReplaceTempView(name)

  print('----таблица сравнения----')

  print (sc.sql(f'SELECT * FROM {name}').show(num_rows))


In [192]:
# показываем отличающиеся строки в обоих датасетах 
def show_diff_rows(sc=sqlContext, num_rows=100):


  # строки отличающиеся по Hash
  sql = f"""
  SELECT
    ds.*
    , '{t1_name}' AS datasource 
  FROM
    {t1_name} AS ds
  WHERE 
    ds.id IN (SELECT t1_id FROM {result_table_name} WHERE t2_id IS NULL)

  UNION ALL

  SELECT
    ds.*
    , '{t2_name}' AS datasource 
  FROM
    {t2_name} AS ds
  WHERE 
    ds.id IN (SELECT t2_id FROM {result_table_name} WHERE t1_id IS NULL)
  """


  # строки отличающиеся по порогу для числовых полей 
  diff_fields = ''
  field_pos =0

  for diff_field in params['numeric_fields']:
      field_pos = field_pos + 1
      diff_fields =  f"\n AND (diff_{field_pos}>={params['pr']})"

      sql = sql +f"""

      UNION ALL
      
      SELECT
        ds.*
        , '{t1_name}' AS datasource 
      FROM
        {t1_name}  AS ds
      WHERE 
        ds.id IN (SELECT t1_id FROM {result_table_name} WHERE t1_id IS NOT NULL {diff_fields} )

      UNION ALL
      
      SELECT
        ds.*
        , '{t2_name}' AS datasource 
      FROM
        {t2_name}  AS ds
      WHERE 
        ds.id IN (SELECT t2_id FROM {result_table_name} WHERE t2_id IS NOT NULL {diff_fields}) 
    """
  sql = sql + "\n ORDER BY id, datasource"

  print('----Отличающиеся строки----')

  sc.sql(sql).show(num_rows)

# Тестируем

In [193]:
# смотрим  результаты при исходных настройках

find_diff(result_table_name)
show_diff_rows()

----таблица сравнения----
+------+-----------+----------+------+-----------+----------+--------------------+
| t1_id| datasource|t1_join_id| t2_id| datasource|t2_join_id|              diff_1|
+------+-----------+----------+------+-----------+----------+--------------------+
|  null|       null|      null|536365|datasource2| 276757883|                 0.0|
|536366|datasource1| 222762075|536366|datasource2| 222762075|0.051724137931034454|
|536367|datasource1|1745396149|536367|datasource2|1745396149|                 0.0|
|536371|datasource1| 269447997|536371|datasource2| 269447997|                 0.0|
|536372|datasource1| 389887883|  null|       null|      null|                 0.0|
|536373|datasource1|  10963135|  null|       null|      null|                 0.0|
|  null|       null|      null|536373|datasource2| 677451890|                 0.0|
+------+-----------+----------+------+-----------+----------+--------------------+

None
----Отличающиеся строки----
+------+---------+---------

Видим:


*  строку с id = 536365 - она есть только в datasource2 
*  строку с id = 536372 - она есть только в datasource1
*  у строк с id =536373 различаются Quantity
* у строк с id = 536366  отличаются цены, но для нас они не различимы так как отличие всего в 0.0517, а порог у нас 0.1





In [194]:
params = {
    'hash_fields': ['StockCode', 'Description', 'Quantity', 'InvoiceDate', 'uid', 'Country'], # поля, от которых будем считать hash
    'numeric_fields': ['UnitPrice'], # поля, по которым будем считать допустимую погрешность. Должны не пересекаться с hash_fields
    'pr': 0.01 # значение допустимой погрешности

}
find_diff(result_table_name)
show_diff_rows()

----таблица сравнения----
+------+-----------+----------+------+-----------+----------+--------------------+
| t1_id| datasource|t1_join_id| t2_id| datasource|t2_join_id|              diff_1|
+------+-----------+----------+------+-----------+----------+--------------------+
|  null|       null|      null|536365|datasource2| 276757883|                 0.0|
|536366|datasource1| 222762075|536366|datasource2| 222762075|0.051724137931034454|
|536367|datasource1|1745396149|536367|datasource2|1745396149|                 0.0|
|536371|datasource1| 269447997|536371|datasource2| 269447997|                 0.0|
|536372|datasource1| 389887883|  null|       null|      null|                 0.0|
|536373|datasource1|  10963135|  null|       null|      null|                 0.0|
|  null|       null|      null|536373|datasource2| 677451890|                 0.0|
+------+-----------+----------+------+-----------+----------+--------------------+

None
----Отличающиеся строки----
+------+---------+---------



* все тоже как в прошлом примере, но уменьшили порог и потому видим отличие для строк с id = 536366

In [195]:
params = {
    'hash_fields': ['StockCode', 'Description',  'InvoiceDate', 'uid', 'Country'], # поля, от которых будем считать hash
    'numeric_fields': ['UnitPrice', 'Quantity'], # поля, по которым будем считать допустимую погрешность. Должны не пересекаться с hash_fields
    'pr': 0.01 # значение допустимой погрешности

}
find_diff(result_table_name)
show_diff_rows()

----таблица сравнения----
+------+-----------+----------+------+-----------+----------+--------------------+------+
| t1_id| datasource|t1_join_id| t2_id| datasource|t2_join_id|              diff_1|diff_2|
+------+-----------+----------+------+-----------+----------+--------------------+------+
|  null|       null|      null|536365|datasource2|  72187578|                 0.0|   0.0|
|536366|datasource1| 570544821|536366|datasource2| 570544821|0.051724137931034454|   0.0|
|536367|datasource1|  26764253|536367|datasource2|  26764253|                 0.0|   0.0|
|536371|datasource1|1401062030|536371|datasource2|1401062030|                 0.0|   0.0|
|536372|datasource1| 250270336|  null|       null|      null|                 0.0|   0.0|
|536373|datasource1| 816965145|536373|datasource2| 816965145|                 0.0|  0.25|
+------+-----------+----------+------+-----------+----------+--------------------+------+

None
----Отличающиеся строки----
+------+---------+--------------------+-



* пробуем использовать пороги сразу для двух полей 'UnitPrice', 'Quantity' - разница больше 0.01 потому видим строки для id = 536373 и 536366

In [196]:
params = {
    'hash_fields': ['StockCode', 'Description',  'InvoiceDate', 'uid', 'Country'], # поля, от которых будем считать hash
    'numeric_fields': ['UnitPrice', 'Quantity'], # поля, по которым будем считать допустимую погрешность. Должны не пересекаться с hash_fields
    'pr': 0.1 # значение допустимой погрешности

}
find_diff(result_table_name)
show_diff_rows()

----таблица сравнения----
+------+-----------+----------+------+-----------+----------+--------------------+------+
| t1_id| datasource|t1_join_id| t2_id| datasource|t2_join_id|              diff_1|diff_2|
+------+-----------+----------+------+-----------+----------+--------------------+------+
|  null|       null|      null|536365|datasource2|  72187578|                 0.0|   0.0|
|536366|datasource1| 570544821|536366|datasource2| 570544821|0.051724137931034454|   0.0|
|536367|datasource1|  26764253|536367|datasource2|  26764253|                 0.0|   0.0|
|536371|datasource1|1401062030|536371|datasource2|1401062030|                 0.0|   0.0|
|536372|datasource1| 250270336|  null|       null|      null|                 0.0|   0.0|
|536373|datasource1| 816965145|536373|datasource2| 816965145|                 0.0|  0.25|
+------+-----------+----------+------+-----------+----------+--------------------+------+

None
----Отличающиеся строки----
+------+---------+--------------------+-


* поднимаем порог до 0.1 для id = 536373 все еще меньше разницы (0.25)   Для строк с id = 536366 мы уже не видим разницу.

In [197]:
params = {
    'hash_fields': ['StockCode', 'Description',  'InvoiceDate', 'uid', 'Country'], # поля, от которых будем считать hash
    'numeric_fields': ['UnitPrice', 'Quantity'], # поля, по которым будем считать допустимую погрешность. Должны не пересекаться с hash_fields
    'pr': 0.26 # значение допустимой погрешности

}
find_diff(result_table_name)
show_diff_rows()

----таблица сравнения----
+------+-----------+----------+------+-----------+----------+--------------------+------+
| t1_id| datasource|t1_join_id| t2_id| datasource|t2_join_id|              diff_1|diff_2|
+------+-----------+----------+------+-----------+----------+--------------------+------+
|  null|       null|      null|536365|datasource2|  72187578|                 0.0|   0.0|
|536366|datasource1| 570544821|536366|datasource2| 570544821|0.051724137931034454|   0.0|
|536367|datasource1|  26764253|536367|datasource2|  26764253|                 0.0|   0.0|
|536371|datasource1|1401062030|536371|datasource2|1401062030|                 0.0|   0.0|
|536372|datasource1| 250270336|  null|       null|      null|                 0.0|   0.0|
|536373|datasource1| 816965145|536373|datasource2| 816965145|                 0.0|  0.25|
+------+-----------+----------+------+-----------+----------+--------------------+------+

None
----Отличающиеся строки----
+------+---------+--------------------+-



* поднимаем порог до 0.26 строки с id = 536373 становятся для нас не различимыми

In [198]:
params = {
    'hash_fields': ['StockCode', 'Description',  'InvoiceDate', 'uid', 'Country', 'UnitPrice', 'Quantity'], # поля, от которых будем считать hash
    'numeric_fields': [], # поля, по которым будем считать допустимую погрешность. Должны не пересекаться с hash_fields
    'pr': 0.26 # значение допустимой погрешности

}
find_diff(result_table_name)
show_diff_rows()

----таблица сравнения----
+------+-----------+----------+------+-----------+----------+
| t1_id| datasource|t1_join_id| t2_id| datasource|t2_join_id|
+------+-----------+----------+------+-----------+----------+
|  null|       null|      null|536365|datasource2|  78091976|
|536366|datasource1|1762860193|  null|       null|      null|
|  null|       null|      null|536366|datasource2| 749749351|
|536367|datasource1|1661980297|536367|datasource2|1661980297|
|536371|datasource1| 314767742|536371|datasource2| 314767742|
|536372|datasource1| 991513814|  null|       null|      null|
|536373|datasource1|1844009968|  null|       null|      null|
|  null|       null|      null|536373|datasource2| 292433632|
+------+-----------+----------+------+-----------+----------+

None
----Отличающиеся строки----
+------+---------+--------------------+--------+------------------+---------+-----+--------------+-----------+
|    id|StockCode|         Description|Quantity|       InvoiceDate|UnitPrice|  uid|  



* убираем использование порогов для полей с числами. получаем все отличающиеся строки

In [199]:
params = {
    'hash_fields': ['StockCode', 'Description',  'InvoiceDate', 'uid', 'Country', 'UnitPrice'], # поля, от которых будем считать hash
    'numeric_fields': [], # поля, по которым будем считать допустимую погрешность. Должны не пересекаться с hash_fields
    'pr': 0.26 # значение допустимой погрешности

}
find_diff(result_table_name)
show_diff_rows()

----таблица сравнения----
+------+-----------+----------+------+-----------+----------+
| t1_id| datasource|t1_join_id| t2_id| datasource|t2_join_id|
+------+-----------+----------+------+-----------+----------+
|  null|       null|      null|536365|datasource2| 894994818|
|536366|datasource1| 120452655|  null|       null|      null|
|  null|       null|      null|536366|datasource2| 581121809|
|536367|datasource1| 333942080|536367|datasource2| 333942080|
|536371|datasource1|1153878540|536371|datasource2|1153878540|
|536372|datasource1|1303847876|  null|       null|      null|
|536373|datasource1|2028191744|536373|datasource2|2028191744|
+------+-----------+----------+------+-----------+----------+

None
----Отличающиеся строки----
+------+---------+--------------------+--------+------------------+---------+-----+--------------+-----------+
|    id|StockCode|         Description|Quantity|       InvoiceDate|UnitPrice|  uid|       Country| datasource|
+------+---------+------------------

* убираем использование порогов для полей с числами. убираем Quantity из  полей сравнения. строки с id =536373 стали для нас не различимы.