In [1]:
import os
import sys
curruser = os.environ.get('USER')

_labdata = os.environ.get("LABDATA_PYSPARK")
sys.path.insert(0, _labdata)
os.chdir(_labdata)

if curruser in os.listdir("/opt/workspace/"):
    sys.path.insert(0, '/opt/workspace/{user}/notebooks/support_library/'.format(user=curruser))
    sys.path.insert(0, '/opt/workspace/{user}/libs/python3.5/site-packages/'.format(user=curruser))
    # sys.path.insert(0, '/opt/workspace/{user}/notebooks/labdata_v1.2/lib/'.format(user=curruser))
else:
    sys.path.insert(0, '/home/{}/notebooks/support_library/'.format(curruser))
    sys.path.insert(0, '/home/{}/python35-libs/lib/python3.5/site-packages/'.format(curruser))
    # sys.path.insert(0, '/home/{}/notebooks/labdata/lib/'.format(curruser))

#import tendo.singleton
import warnings
warnings.filterwarnings('ignore')

import joblib
import json
from joblib import Parallel, delayed

from time import sleep
from itertools import islice
from multiprocessing import Pool, Process, JoinableQueue
from multiprocessing.pool import ThreadPool
from functools import partial
import subprocess
from threading import Thread
import time
from datetime import datetime as dt

from transliterate import translit

from lib.spark_connector import SparkConnector
from lib.sparkdb_loader import *
from lib.connector import OracleDB
import pyspark
from pyspark import SparkContext, SparkConf, HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql.dataframe import DataFrame

import re
import pandas as pd
import numpy as np
from tqdm._tqdm_notebook import tqdm_notebook
from pathlib import Path
import shutil
import loader as load
from collections import ChainMap

from lib.config import *
from lib.tools import *

# sing = tendo.singleton.SingleInstance()

# os.chdir('/opt/workspace/ektov1-av_ca-sbrf-ru/notebooks/Clickstream_Analytics/AutoUpdate/')
# os.chdir('/opt/workspace/{}/notebooks/clickstream/AutoUpdate/'.format(curruser))

def show(self, n=10):
    return self.limit(n).toPandas()

def typed_udf(return_type):
    '''Make a UDF decorator with the given return type'''

    def _typed_udf_wrapper(func):
        return f.udf(func,return_type)

    return _typed_udf_wrapper

pyspark.sql.dataframe.DataFrame.show = show

def print_and_log(message: str):
    print(message)
    logger.info(message)
    return None

CONN_SCHEMA = 'sbx_team_digitcamp' #'sbx_t_team_cvm'


In [2]:
sp = spark(schema=CONN_SCHEMA,
               dynamic_alloc=False,
               numofinstances=10,
               numofcores=8,
               executor_memory='35g',
               driver_memory='35g',
               kerberos_auth=True,
               process_label="TEST_PYSPARK_"
               )

hive = sp.sql
print(sp.sc.version)

# __init__ : begin
2.4.0.cloudera2


In [9]:
hive.setConf("hive.exec.dynamic.partition","true")
hive.setConf("hive.exec.dynamic.partition.mode","nonstrict")
hive.setConf("hive.enforce.bucketing","false")
hive.setConf("hive.enforce.sorting","false")
hive.setConf("spark.sql.sources.partitionOverwiteMode","dynamic")
# hive.setConf("hive.exec.stagingdir", "/tmp/{}/".format(curruser))
# hive.setConf("hive.exec.scratchdir", "/tmp/{}/".format(curruser))
hive.setConf("hive.load.dynamic.partitions.thread", 1)

In [3]:
sdf = hive.table('sbx_team_digitcamp.cid_inn_insert_2021_09_10')

## Create Partitioned SDF

In [4]:
def createSDF(conn_schema, target_tbl, insert, part_tupl_lst):

    hive.sql('''create table {schema}.{tbl} (
                                             {fields}
                                                )
                 PARTITIONED BY ({part_col_lst})
             '''.format(schema=conn_schema,
                        tbl=target_tbl,
                        fields=insert,
                        part_col_lst=part_tupl_lst)
            )

In [5]:
def insertToSDF(sdf, conn_schema, tmp_tbl, target_tbl, part_col_lst):
    
    sdf.registerTempTable(tmp_tbl)
    
    hive.sql("""
    insert overwrite table {schema}.{tbl}
    partition({part_col})
    select * from {tmp_tbl}
    distribute by ({part_col})
    """.format(schema=conn_schema,
               tbl=target_tbl,
               tmp_tbl=tmp_tbl,
               part_col=part_col_lst)
            )

def collectRowsByIndex(i, it, indxs):
    out = []
    if i in indxs:
         out.extend(list(it)) #islice(it,0,5) 
    else:
        pass

    return out

In [6]:
conn_schema = 'sbx_team_digitcamp'
table_name = 'ga_cid_sbbol_inn_update'
part_tupl_lst = [('ctl_loading', 'bigint')]
part_tupl_str = ', '.join(["{} {}".format(col, _type) for col, _type in part_tupl_lst])
                          
hive.sql("drop table if exists {schema}.{tbl} purge".format(schema=conn_schema, tbl=table_name))
insert = ', '.join(["{} {}".format(col, _type) for col, _type in sdf.dtypes if col.lower() not in part_tupl_lst[0][0]])

In [7]:
createSDF(conn_schema, target_tbl=table_name, insert=insert, part_tupl_lst=part_tupl_str)

# Map Whole SDF into Partitions 

In [10]:
insertToSDF(sdf,
            conn_schema='sbx_team_digitcamp',
            tmp_tbl='tmp_ga_cid_inn', 
            target_tbl='ga_cid_sbbol_inn_update', 
            part_col_lst='ctl_loading')

## EXchange Partitions Between Tables

In [None]:
# for part in part_diff:
#     hive.sql('''ALTER TABLE {schema}.{tbl0} EXCHANGE PARTITION (ctl_loading='{prt}') WITH TABLE {schema}.{tbl1}'''\
#              .format(schema=conn_schema, 
#                      tbl0 ='ga_cid_sbbol_inn_update',      
#                      tbl1 ='ga_cid_sbbol_inn',
#                      prt=part
#                     )
#             )

## Merge Two Partitioned Tables via hdfs

In [16]:
import subprocess

In [24]:
data_path = 'hdfs://clsklsbx/user/team/team_digitcamp/hive/'

In [25]:
parts_from = hive.sql("show partitions {}.{}".format('sbx_team_digitcamp','ga_cid_sbbol_inn_update')).collect()
parts_from = [part for part in parts_from if not part['partition'].endswith('__HIVE_DEFAULT_PARTITION__')]
parts_from = sorted(parts_from,reverse=True, key=lambda x: int(x['partition'].split('=')[-1]))

In [26]:
parts_from = [part['partition'] for part in parts_from if not part['partition'].endswith('__HIVE_DEFAULT_PARTITION__')]

In [34]:
parts_to = hive.sql("show partitions {}.{}".format('sbx_team_digitcamp','ga_cid_sbbol_inn')).collect()
parts_to = [part for part in parts_to if not part['partition'].endswith('__HIVE_DEFAULT_PARTITION__')]
parts_to = sorted(parts_to, reverse=True, key=lambda x: int(x['partition'].split('=')[-1]))
# parts_to = sorted(parts_to,reverse=True)

In [35]:
part_diff = set(parts_from) - set(parts_to)
part_diff = [part.split('=')[-1]  for part in part_diff]
part_diff = sorted(part_diff,reverse=True)
part_diff

['11482743',
 '11481779',
 '11461830',
 '11461026',
 '11459968',
 '11441601',
 '11422415',
 '11404396',
 '11403618',
 '11390067',
 '11389420',
 '11388762',
 '11373411',
 '11372608',
 '11371633',
 '11354808',
 '11353676',
 '11352959',
 '11352321',
 '11351272',
 '11332912',
 '11332186',
 '11331229',
 '11311484',
 '11292575',
 '11291830',
 '11275461',
 '11274756',
 '11273921',
 '11259526',
 '11259276',
 '11241503',
 '11240862',
 '11239743',
 '11221260',
 '11220586']

In [36]:
for part_num in part_diff:
    print('ADDING PARTITION: {}...'.format(part_num))
    hive.sql('''ALTER TABLE sbx_team_digitcamp.ga_cid_sbbol_inn ADD IF NOT EXISTS PARTITION(ctl_loading='{}')'''.format(part_num))

ADDING PARTITION: 11482743...
ADDING PARTITION: 11481779...
ADDING PARTITION: 11461830...
ADDING PARTITION: 11461026...
ADDING PARTITION: 11459968...
ADDING PARTITION: 11441601...
ADDING PARTITION: 11422415...
ADDING PARTITION: 11404396...
ADDING PARTITION: 11403618...
ADDING PARTITION: 11390067...
ADDING PARTITION: 11389420...
ADDING PARTITION: 11388762...
ADDING PARTITION: 11373411...
ADDING PARTITION: 11372608...
ADDING PARTITION: 11371633...
ADDING PARTITION: 11354808...
ADDING PARTITION: 11353676...
ADDING PARTITION: 11352959...
ADDING PARTITION: 11352321...
ADDING PARTITION: 11351272...
ADDING PARTITION: 11332912...
ADDING PARTITION: 11332186...
ADDING PARTITION: 11331229...
ADDING PARTITION: 11311484...
ADDING PARTITION: 11292575...
ADDING PARTITION: 11291830...
ADDING PARTITION: 11275461...
ADDING PARTITION: 11274756...
ADDING PARTITION: 11273921...
ADDING PARTITION: 11259526...
ADDING PARTITION: 11259276...
ADDING PARTITION: 11241503...
ADDING PARTITION: 11240862...
ADDING PAR

In [37]:
for ctl in tqdm_notebook(parts_from, total=len(parts_from)):
    
    hdfs_from = data_path+'ga_cid_sbbol_inn_update'+'/'+'{}/*'.format(ctl)
    hdfs_to   = data_path+'ga_cid_sbbol_inn'+'/'+'{}/'.format(ctl)

    subprocess.call(['hdfs', 'dfs', '-cp', '-f', hdfs_from, hdfs_to], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
#     res= re.sub("\s+"," ",p.communicate()[0].decode('utf-8'))
#     print(res)




In [38]:
hive.sql("msck repair table sbx_team_digitcamp.ga_cid_sbbol_inn")

DataFrame[]

## Checking for availability of newly added partititons

In [39]:
hive.sql("select * from sbx_team_digitcamp.ga_cid_sbbol_inn where ctl_loading=11482743").show()

Unnamed: 0,cid,replicationguid,sbboluserid,commonSegmentoUID,cu_id_sbbol,user_id,cu_inn,cu_kpp,crm_id,cu_okpo,locked,load_dt,ctl_loading
0,485287103.1479025,b89d535c-45b9-434d-97fa-3cbeef831385,3c25f094-1000-fc84-e054-90e2ba85d0c8,,485541,510966,10502224988,,1-9671H4,137177720,0,2021-09-13,11482743
1,2094633461.1629865,8a3c9bb7-d16c-490b-accb-d388c98f1f06,48b6e5af-67a0-40c8-a1be-0acaa838bcf1,,2757385,5454951,214005782,21643001.0,,22582936,0,2021-09-13,11482743
2,635760129.1631017,c27d04b3-b1a6-4707-b2cb-aa971c710354,89b20944-c8de-4ad8-8e6f-52fd89376fff,,1152065,8419343,214005782,21401001.0,,92800820,0,2021-09-13,11482743
3,1730984454.1631017,8a3c9bb7-d16c-490b-accb-d388c98f1f06,cada34ee-86d2-4702-9dbf-a6c58da563a5,,2757385,8938503,214005782,21643001.0,,22582936,0,2021-09-13,11482743
4,2119759164.1630988,c27d04b3-b1a6-4707-b2cb-aa971c710354,d49a6798-de8b-47d5-bdf0-965e51e00978,,1152065,7110499,214005782,21401001.0,,92800820,0,2021-09-13,11482743
5,521877725.16158,4dc140e1-2e2e-4aac-b743-07d41aa68bfc,3c25f0be-00ee-1d5b-e054-90e2ba9a7f6c,gd3htZPRj5Nm,1861252,2336438,23302757370,,,141198354,0,2021-09-13,11482743
6,245252727.163022,41d1a460-7297-4b3c-8320-23f2426c1d0c,3c25f0ad-e713-1d5b-e054-90e2ba9a7f6c,,2024473,2589125,25802481269,,1-2HU4TPK,155835394,0,2021-09-13,11482743
7,1818264086.16176,d70ced9c-c48b-4ccc-ab0e-8c003c148bef,3c25f0b4-b9d5-1d5b-e054-90e2ba9a7f6c,xAhbylouAW6M,984922,2750671,26301645165,,1-154OS95,162193521,0,2021-09-13,11482743
8,1639127343.1630895,a856dbe0-09de-4b96-af62-4e6384f8b8f7,3c25f0b5-bc0e-1d5b-e054-90e2ba9a7f6c,,1266035,1440960,26511486810,,1-1POMZNG,91308925,0,2021-09-13,11482743
9,1663923000.159888,ec7f80bd-867b-422a-aca1-5401d614709d,3cfb6875-3157-4f23-be1a-3d7fcba8d2a3,,2022199,9727797,26705473539,,,193957833,0,2021-09-13,11482743
