In [1]:
import os
import shutil
import datetime
import pandas as pd
from pyspark import SparkContext, SparkConf, HiveContext
from pyspark.sql import SparkSession, Row, functions, Column
from pyspark.sql.types import StringType 
import pyspark.sql.functions as F

### Processors

In [20]:
def process_scd2(ndata, sdata):
    ndata = ndata.withColumnRenamed('Osm_id', 'new_Osm_id') \
        .withColumnRenamed('Par_osm_id', 'new_Par_osm_id') \
        .withColumnRenamed('Name', 'new_Name') \
        .withColumnRenamed('Level', 'new_Level') \
        .withColumnRenamed('val_to', 'new_val_to') \
        .withColumnRenamed('val_from', 'new_val_from')
    
    merged = ndata.join(sdata, sdata.Osm_id == ndata.new_Osm_id, how='fullouter')

    merged = merged.withColumn(
        'action', 
        F.when(merged.new_Osm_id.isNull(), 'keepold')
        .when(merged.Osm_id.isNull(), 'insertnew')
        .when(merged.new_Level != merged.Level, 'update')
        .otherwise('noaction')
    )

    columns = ['Osm_id', 'Par_osm_id', 'Name', 'Level', 'val_to', 'val_from']

    # process noaction
    df_noact = merged.filter('action="noaction"').select(columns)

    # process keepold
    df_keep = merged.filter('action="keepold"').select(
        merged.Osm_id,
        merged.Par_osm_id,
        merged.Name,
        merged.Level,
        merged.val_to,
        merged.val_from
    )

    # process insertnew
    df_new = merged.filter('action="insertnew"').select(
        merged.new_Osm_id.alias('Osm_id'),
        merged.new_Par_osm_id.alias('Par_osm_id'),
        merged.new_Name.alias('Name'),
        merged.new_Level.alias('Level'),
        merged.new_val_to.alias('val_to'),
        merged.new_val_from.alias('val_from')
    )

    # process update
    df_upd = merged.filter('action="update"')

    df_upd_old = df_upd.select(
        df_upd.Osm_id,
        df_upd.Par_osm_id,
        df_upd.Name,
        df_upd.Level,
        df_upd.val_to,
        df_upd.new_val_to.alias('val_from')
    )

    df_upd_new = df_upd.select(
        df_upd.new_Osm_id.alias('Osm_id'),
        df_upd.new_Par_osm_id.alias('Par_osm_id'),
        df_upd.new_Name.alias('Name'),
        df_upd.new_Level.alias('Level'),
        df_upd.new_val_to.alias('val_to'),
        df_upd.new_val_from.alias('val_from')
    )

    full_merged = df_noact.unionAll(df_keep).unionAll(df_new).unionAll(df_upd_old).unionAll(df_upd_new)
    return full_merged


def _get_last_nonnull_id(*osm_ids):
    for osm_id in osm_ids:
        if not pd.isna(osm_id):
            return osm_id
_last_nonnull_udf = F.udf(lambda osm_ids: _get_last_nonnull_id(*osm_ids), StringType())        

class SparkLoader:
    def __init__(self, memory=2, cores=2, driver_memory=2, app_name='osm'):
        self._memory = memory
        self._cores = cores
        self._driver_memory = driver_memory
        self._app_name = app_name
        self._udf_cols = list(reversed([F.col(f'ADMIN_L{i}D') for i in range(1, 11)]))
        self._session = self._create_session()
    
    @property
    def session(self):
        return self._session
    
    def _create_session(self):
        spark_conf = SparkConf().setAppName(self._app_name)
        spark_conf.set('spark.executor.memory', f'{self._memory}g')
        spark_conf.set('spark.executor.cores', f'{self._cores}')
        spark_conf.set('spark.driver.memory', f'{self._driver_memory}g')
        spark_conf.set('spark.driver.extraClassPath', '/home/ripper/postgresql-42.2.19.jar')
        spark_conf.set('spark.jars.packages', 'org.postgresql:postgresql:42.2.19')
        return SparkSession.Builder().config(conf=spark_conf).getOrCreate()
    
    def process_csv_data(self, path):
        df = self._session.read.options(header=True, delimiter=';').csv(path)
        return df.withColumn('Par_osm_id', _last_nonnull_udf(F.array(self._udf_cols))) \
            .withColumnRenamed('OSM_ID', 'Osm_id') \
            .withColumnRenamed('NAME', 'Name') \
            .withColumnRenamed('ADMIN_LVL', 'Level') \
            .withColumn('val_to', F.lit(datetime.datetime.now())) \
            .withColumn('val_from', F.lit(datetime.datetime.max)) \
            .select('Osm_id', 'Par_osm_id', 'Name', 'Level', 'val_to', 'val_from')
    
    def write_data(self, df, filepath='osm.parquet', mode='append'):
        df.write.mode('append').parquet(filepath)
    
    def read_parquet(self, path='osm.parquet'):
        return self._session.read.parquet(path)
    
    def write2dbase(self, df, table='test'):
        df.write.format("jdbc").mode('overwrite') \
            .options(
                     url='jdbc:postgresql://localhost:5454/',
                     dbtable=table,
                     user='postgres',
                     password='docker',
                     driver='org.postgresql.Driver'
            ).saveAsTable(table)
    
    def read4dbase(self, df, table='test'):
        return self._session.read.format("jdbc"). \
        options(
                 url='jdbc:postgresql://localhost:5454/',
                 dbtable=table,
                 user='postgres',
                 password='docker',
                 driver='org.postgresql.Driver'
        ).load()
    

## Main part

### process files and write parquet

In [3]:
dir_path = 'data'
files = list(map(lambda x: os.path.join(dir_path, x),os.listdir(dir_path)))

In [21]:
loader = SparkLoader()

In [5]:
if os.path.exists('osm.parquet'):
    shutil.rmtree('osm.parquet')
df = None
for file in files:
    if df is None:
        df = loader.process_csv_data(file)
    else:
        df = df.union(loader.process_csv_data(file))

In [6]:
old_data = loader.session.read.format("jdbc"). \
options(
         url='jdbc:postgresql://localhost:5454/',
         dbtable='test',
         user='postgres',
         password='docker',
         driver='org.postgresql.Driver'
).load()

In [7]:
old_data.show()

+-------+----------+--------------------+-----+--------------------+--------------------+
| Osm_id|Par_osm_id|                Name|Level|              val_to|            val_from|
+-------+----------+--------------------+-----+--------------------+--------------------+
|5827722|   1430613|Правобережный адм...|    9|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5827642|   1430613|Октябрьский админ...|    9|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5826259|   1430613|Ленинский админис...|    9|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5827226|   1430613|Свердловский адми...|    9|2021-03-21 06:30:...|9999-12-31 23:59:...|
| 145454|   1221148|   Иркутская область|    4|2021-03-21 06:30:...|9999-12-31 23:59:...|
|  60189|      null|              Россия|    2|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5756862|   1456786|сельское поселени...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5756859|   1456786|сельское поселени...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5756857| 

In [11]:
df = process_scd2(df, old_data)

In [13]:
df.count()

462

In [12]:
loader.write_data(df)

In [15]:
loader.read_parquet().show()

+-------+----------+--------------------+-----+--------------------+--------------------+
| Osm_id|Par_osm_id|                Name|Level|              val_to|            val_from|
+-------+----------+--------------------+-----+--------------------+--------------------+
|5730689|   1464421|Новоигирминское г...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5738215|    190110|Староалзамайское ...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5722707|   1454435|Петропавловское с...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|1104995|   1104970|Портбайкальское с...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5720461|   1459545|Голоустненское се...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5747207|   1463469|Большееланское се...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5753790|   1458139|Новостроевское се...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5747206|   1463469|Белореченское гор...|    8|2021-03-21 06:30:...|9999-12-31 23:59:...|
|5719489| 

### write to dbase

In [22]:
loader.write2dbase(df)

In [23]:
table = loader.session.read.format("jdbc"). \
options(
         url='jdbc:postgresql://localhost:5454/',
         dbtable='test',
         user='postgres',
         password='docker',
         driver='org.postgresql.Driver'
).load()

In [24]:
table.show()

+-------+----------+--------------------+-----+--------------------+--------------------+
| Osm_id|Par_osm_id|                Name|Level|              val_to|            val_from|
+-------+----------+--------------------+-----+--------------------+--------------------+
|1456789| 3438290.0|  Баяндаевский район|    6|2021-03-21 08:40:...|9999-12-31 23:59:...|
|1460653|    145454|   Балаганский район|    6|2021-03-21 08:40:...|9999-12-31 23:59:...|
|5720474|   1459545|Сосновоборское се...|    8|2021-03-21 08:40:...|9999-12-31 23:59:...|
|5721326|   1456241|Бирюльское сельск...|    8|2021-03-21 08:40:...|9999-12-31 23:59:...|
|5728519|   1464422|Зябинское сельско...|    8|2021-03-21 08:40:...|9999-12-31 23:59:...|
|5756859|   1456786|сельское поселени...|    8|2021-03-21 08:40:...|9999-12-31 23:59:...|
|5752233|   1454692|Усть-Кутское горо...|    8|2021-03-21 08:40:...|9999-12-31 23:59:...|
|1469711|    145454|городской округ Б...|    6|2021-03-21 08:40:...|9999-12-31 23:59:...|
|5739676| 

### Test SCD2

In [102]:
sdata = [
    [
        '5827722',
        '1430613',
        'Правобережный административный округ',
        '8',
        datetime.datetime.now(),
        datetime.datetime(2025, 12, 31, 23, 00, 00)
    ],
    [
        '5827721',
        '1430613',
        'Левобережный административный округ',
        '8',
        datetime.datetime.now(),
        datetime.datetime(2025, 12, 31, 23, 00, 00)
    ],
    [
        '582771',
        '1430613',
        'gdfдминистративный округ',
        '8',
        datetime.datetime.now(),
        datetime.datetime(2025, 12, 31, 23, 00, 00)
    ]
    
]
source_data = pd.DataFrame(sdata, columns=['Osm_id', 'Par_osm_id', 
                                           'Name', 'Level', 'val_to', 'val_from'])

ndata = [
    [
        '5827722',
        '1430613',
        'Правобережный административный округ',
        '7',
        datetime.datetime(2022, 12, 12,23, 00, 00),
        datetime.datetime(2025, 12, 31, 23, 00, 00)
    ],
    [
        '5827720',
        '1430614',
        'sa административный округ',
        '7',
        datetime.datetime(2022, 12, 12,23, 00, 00),
        datetime.datetime(2025, 12, 31, 23, 00, 00)
    ],
    [
        '582771',
        '1430613',
        'gdfдминистративный округ',
        '4',
        datetime.datetime(2022, 12, 12,23, 00, 00),
        datetime.datetime(2025, 12, 31, 23, 00, 00)
    ]
]
new_data = pd.DataFrame(ndata, columns=['new_Osm_id', 'new_Par_osm_id', 
                                           'new_Name', 'new_Level', 'new_val_to', 'new_val_from'])

In [103]:
sdata = loader.session.createDataFrame(source_data)
ndata = loader.session.createDataFrame(new_data)

In [104]:
merged = ndata.join(sdata, sdata.Osm_id == ndata.new_Osm_id, how='fullouter')

merged = merged.withColumn(
    'action', 
    F.when(merged.new_Osm_id.isNull(), 'keepold')
    .when(merged.Osm_id.isNull(), 'insertnew')
    .when(merged.new_Level != merged.Level, 'update')
    .otherwise('noaction')
)

columns = ['Osm_id', 'Par_osm_id', 'Name', 'Level', 'val_to', 'val_from']

# process noaction
df_noact = merged.filter('action="noaction"').select(columns)

# process keepold
df_keep = merged.filter('action="keepold"').select(
    merged.Osm_id,
    merged.Par_osm_id,
    merged.Name,
    merged.Level,
    merged.val_to,
    merged.val_from
)

# process insertnew
df_new = merged.filter('action="insertnew"').select(
    merged.new_Osm_id.alias('Osm_id'),
    merged.new_Par_osm_id.alias('Par_osm_id'),
    merged.new_Name.alias('Name'),
    merged.new_Level.alias('Level'),
    merged.new_val_to.alias('val_to'),
    merged.new_val_from.alias('val_from')
)

# process update
df_upd = merged.filter('action="update"')

df_upd_old = df_upd.select(
    df_upd.Osm_id,
    df_upd.Par_osm_id,
    df_upd.Name,
    df_upd.Level,
    df_upd.val_to,
    df_upd.new_val_to.alias('val_from')
)

df_upd_new = df_upd.select(
    df_upd.new_Osm_id.alias('Osm_id'),
    df_upd.new_Par_osm_id.alias('Par_osm_id'),
    df_upd.new_Name.alias('Name'),
    df_upd.new_Level.alias('Level'),
    df_upd.new_val_to.alias('val_to'),
    df_upd.new_val_from.alias('val_from')
)

full_merged = df_noact.unionAll(df_keep).unionAll(df_new).unionAll(df_upd_old).unionAll(df_upd_new)

In [113]:
full_merged.toPandas()

Unnamed: 0,Osm_id,Par_osm_id,Name,Level,val_to,val_from
0,5827721,1430613,Левобережный административный округ,8,2021-03-21 08:33:19.164631,2025-12-31 23:00:00
1,5827720,1430614,sa административный округ,7,2022-12-12 23:00:00.000000,2025-12-31 23:00:00
2,5827722,1430613,Правобережный административный округ,8,2021-03-21 08:33:19.164619,2022-12-12 23:00:00
3,582771,1430613,gdfдминистративный округ,8,2021-03-21 08:33:19.164632,2022-12-12 23:00:00
4,5827722,1430613,Правобережный административный округ,7,2022-12-12 23:00:00.000000,2025-12-31 23:00:00
5,582771,1430613,gdfдминистративный округ,4,2022-12-12 23:00:00.000000,2025-12-31 23:00:00
