In [None]:
import os
import sys
 
import pyspark
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark import sql
from pyspark.sql.dataframe import DataFrame
 
from datetime import datetime as dt
from datetime import timedelta
from dateutil import relativedelta
import time
from time import clock
 
import pandas as pd
import numpy as np
 
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 256)
pd.set_option('display.float_format', lambda x: '%.3f' % x)
 
# for beautiful output, prototype-only!
def show(self, n=50):
    return self.limit(n).toPandas()
 
pyspark.sql.dataframe.DataFrame.show = show

In [None]:
os.environ['SPARK_MAJOR_VERSION'] = '2'
os.environ['SPARK_HOME'] = '/usr/sdp/3.4.0.1-1/spark2/'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'
os.environ['LD_LIBRARY_PATH'] = '/opt/python/virtualenv/jupyter/lib'
sys.path.insert(0, '/usr/sdp/3.4.0.1-1/spark2/python/')
sys.path.insert(0, '/usr/sdp/3.4.0.1-1/spark2/python/lib/py4j-0.10.7-src.zip')
 
from pyspark.sql import SparkSession
spark = (SparkSession
   .builder
   .master("yarn-client")
   .config("spark.local.dir", "sparktmp")
   .config("spark.dynamicAllocation.enabled", True)
   .config('spark.shuffle.service.enabled', True)
   .config("spark.dynamicAllocation.minExecutors", 2)
   .config("spark.dynamicAllocation.maxExecutors", 15)
   .config("spark.kryoserializer.buffer.max", "2040mb")
   .config("spark.hadoop.hive.exec.dynamic.partition", True)
   .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
 
   .config("spark.sql.hive.caseSensitiveInferenceMode", "NEVER_INFER")
   # .config("spark.sql.autoBroadcastJoinThreshold", -1)
   .config("spark.driver.maxResultSize", "6g")
   .config("spark.driver.memory", "8g")
   .config("spark.yarn.driver.memoryOverhead", "6048mb")
 
   .config("spark.executor.memory", "12g")
   .config("spark.executor.cores", 4)
   .config("spark.executor.instances", 12)
   .config("spark.yarn.driver.memoryOverhead", "6048mb")
    
    .enableHiveSupport()
   .appName('template')
   .getOrCreate())

In [None]:
spark

# Search in Hive

## Search database

In [None]:
BASE_NAME_SHOULD_CONTAIN = 'prx'
spark.sql('show databases')\
   .filter(F.col('databaseName').contains(BASE_NAME_SHOULD_CONTAIN)).show()
 
spark.catalog.listTables('prx_sbof_2_selfservice_sbof')

## Search columns

In [None]:
COLUMN_NAME_SHOULD_CONTAIN = 'inn'
scheme_nm = 'cib_custom_cib_p4d_pprb_ucpkb'
for table in spark.catalog.listTables(scheme_nm):
   flg = 0
   for column in spark.catalog.listColumns(table.name,scheme_nm):
       if COLUMN_NAME_SHOULD_CONTAIN in column.name.lower():
           print('Found column {} in table {}'.format(column.name.lower(),table.name))
            flg = 1
   if (flg == 1):
       print()

## Search for joinable fields

In [None]:
a = spark.table("cib_custom_cib_p4d_pprb_ucpkb.party_equivalent_equivalent").limit(5000)
b = spark.table("cib_custom_cib_p4d_pprb_ucpkb.party_identificatation_inn").limit(5000)
 
for col_a in a.columns:
   for col_b in b.columns:
       # If the types are not equal:
       if a.select(col_a).dtypes[0][1] != b.select(col_b).dtypes[0][1]:
           continue
       else:
           join_result = (
           a.alias('a')
            .join(b.alias('b'),
                  on=(F.col('a.' + col_a) == F.col('b.' + col_b)))
            .count())
           if join_result:
               print("type:", a.select(col_a).dtypes[0][1],
                     "\ncolumns:", col_a, col_b, sep='\t')

# Partitions

## By which field a table is partitioned?

In [None]:
hive_get_partition_names('sbx_t_team_ds_kb_sme.03_mart_antifraud')

## Plot partition sizes

In [None]:
def show_partition_skew(df: DataFrame): # Рисует график распределения даных по партициям
    (df.withColumn('partition_id', F.spark_partition_id())
   .groupBy('partition_id').count()
   .toPandas().sort_values('partition_id')
              .plot(x='partition_id',
                    y='count',
                    kind='line'))
 
show_partition_skew(spark.table('sbx_t_team_ds_kb_sme.03_mart_antifraud'))

## Generate multiple "SHOW CREATE TABLE"s

In [None]:
schema = 'cib_internal_eks_ibs'
source_tables = ['z_ac_fin', 'z_depart', 'z_branch', 'z_ft_money', 'z_ps', 'z_intend_for',
         'z_acc_product', 'z_type_acc', 'z_user']
 
with open('Accounts_show_create_table_for_sources.txt', 'w') as f:
   for table in source_tables:
       raw_text = (spark.sql('show create table {}.{}'.format(schema, table))
                   .show().iloc[0, 0])
       f.write("======================================\n"
               + "===========\t" + table + "\t=============\n"
               + "======================================\n"
               + raw_text + "\n")

# Analytics

In [None]:
df.select(..., F.to_timestamp('effectivefrom', 'dd.MM.yyyy HH:mm:ss').alias('create'))

In [None]:
df.withColumn('len', F.length('egrul_org_id'))\
   .groupby('len')\
   .agg(F.count('len').alias('count'))\
   .sort(F.desc('count')).show()

In [None]:
df.withColumn('ul_adrs_house_num',
                 F.regexp_replace('ul_adrs_house_num', r'^[а-я]*\.?', ''))

## 180 days window

In [None]:
from datetime import datetime, date, timedelta
import dateutil.relativedelta
import calendar
from calendar import monthrange
date_to = (datetime.now() - timedelta(days = 1)).strftime("%Y-%m-%d")
date_from = (datetime.now() - timedelta(days = 180)).strftime("%Y-%m-%d")
 
window_over = Window.partitionBy('agr_cred_id').orderBy('gregor_dt').rowsBetween(179, Window.currentRow)
spark.catalog.refreshTable("prx_igntv1_dwh_slcl.v_agr_cred")
agr_cred = spark\
   .table("prx_igntv1_dwh_slcl.v_agr_cred")\
   .selectExpr(
               "host_agr_cred_id as contract_id" ,
               "gregor_dt_part as report_dt",
               'gregor_dt',
               'restruct_flag',
               'last_restruct_dt as rest_dt',
               'agr_cred_id',
               'agr_num',
               'cust_id',
               'debt_ovr_dt',
               'intrst_ovr_dt',
               'expiration_dt as contract_close_dt',
               'ovr_days_cnt'
          )\
   .filter(F.col('report_dt').between(date_from, date_to))\
   .filter(F.col('contract_id') != "-1")\
   .withColumn('ovr_days_180',
                F.sum(F.when(F.col('ovr_days_cnt') > 0, 1).otherwise(0)).over(window_over))

## Read Excel

In [None]:
spark.createDataFrame(pd.read_excel('fraud.xlsx'))

## Distinct by field

In [None]:
result.select('ul_status_nm').distinct().collect()
# same in list:
[i.ul_status_nm for i in result.select('ul_status_nm').distinct().collect()]

## Count nulls

In [None]:
df.filter(F.isnan('request_num')).count()
 
spark.table('prx_sbof_3_selfservice_sbof.dd_calendar')\
   .filter(F.col('pymondaystart').isNull()).count()

### By all fields

In [None]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show().T

### In percents

In [None]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()\
.T.rename(columns={0: '% Nulls'}).div(df.count()).mul(100)

## Count non-null

In [None]:
table.na.drop().count()

## Delete columns which ends with _x

In [None]:
result = result.drop(*[col for col in result.columns if col.endswith('_x')])

## Cell value in list?

In [None]:
result.filter(result.org_segment.isin('Микро', 'Малые', 'Малый')).count()

## .value_counts()

In [None]:
result.groupby('org_segment')\
   .count().alias('amount').show()
 
temp_df.groupBy('ul_head_inn').count()\
   .orderBy(F.desc('count')).show()

## Compare x strings by columns

In [None]:
import pandas as pd

import pyspark
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
 
def show_diff_in_rows(df: DataFrame, sort_by: str) -> pd.DataFrame:
    """Compare strings by columns. For example, if you want to comare all column values 
    in two rows respectively - you should filter the df to only those two rows left and pass it
    as the first argument
    
    :param df: spark DataFrame (< 7 rows!!!)
   :param sort_by: by which column we enumerate rows
   :return: Pandas DataFrame with distinctive values"""
    
    def to_long(row: DataFrame, by: str) -> DataFrame:
       """Transpone the dataframe
       :param row: spark Dataframe with one row
        :param by: Name of the column which contains row number
       :return: dataframe (row_number | Column_name | Respective_value1 | Respective_value2 | ...)"""
        cols, dtypes = zip(*((c, t) for (c, t) in row.dtypes if c not in by))
       assert len(set(dtypes)) == 1, "All columns have to be the same type"
 
       kvs = F.explode(F.array([
           F.struct(F.lit(c).alias("col_name"), F.col(c).alias('val')) for c in cols
       ])).alias('kvs')
       return row.select(by + [kvs]).select(by + ["kvs.col_name", "kvs.val"])
 
   df = df.withColumn('r_num', F.row_number().over(Window.orderBy(sort_by)))
    # Conver everithing to String, so to_long() could do the job
    df = df.select([F.col(c).cast("string") for c in df.columns])
   df_lst = [to_long(df.filter(F.col("r_num") == i), ["r_num"])
                 .withColumnRenamed('val', ('string_' + str(i)))
             for i in range(1, df.count() + 1)]
    
    # Create the df with one column, which contains all column names of the input df
    accum = spark.createDataFrame(data=[(i[0], ) for i in df_lst[0].select('col_name').collect()],
                                  schema=[('col_name'),])
   for elem in df_lst:
       accum = accum.join(elem.drop("r_num"), on="col_name", how="inner")
        
    for idx, elem in enumerate(accum):
       accum = accum.filter(F.col("string_" + str(idx+1)) != F.col("string_" + str(idx + 2)))
       if (idx == df.count() - 2): break
 
   return accum.show()
 
# Example usage:    
show_diff_in_rows(spark.table('cib_custom_cb_akm_integrum.ul_organization_egrul')
                         .filter(F.col('ul_inn') == '3703023078'),
                'effectiveFrom')

# Saving and deleting

## Save

In [None]:
df.write.format("parquet").mode("overwrite").saveAsTable("sbx_t_team_ds_kb_sme.companies_by_ul_head")

## Resave

In [None]:
(spark.table('cib_custom_cib_ml360.u_client_products')
.write.format('parquet')
.mode('overwrite')
.saveAsTable('sbx_t_team_ds_kb_sme.u_client_products'))

## With partitioning

In [None]:
(spark.table('sbx_t_team_ds_kb_sme.partner_cred')
.write.format('parquet')
.mode('overwrite').partitionBy("part_dt")
.saveAsTable('sbx_t_team_ds_kb_sme.partner_cred_backup'))

## Delete

In [None]:
spark.sql("truncate table sbx_t_team_ds_kb_sme.table") # removes all the rows
hive_drop_table('sbx_t_team_ds_kb_sme.table') # delete metainfo
spark.sql("drop table sbx_t_team_ds_kb_sme.table purge") # delete everithing, including parquet files!!!

# Versions

In [None]:
from sys import version, version_info
 
print(version)
print(version_info)
 
!which python3
 
from sys import version, path
 
print(version)
import defusedxml
print('defusedxml', defusedxml._version_)
import setuptools
print('setuptools', setuptools._version_)
import exchangelib
print('exchangelib', exchangelib._version_)
import pip
print('pip', pip._version_)
import ssl