In [None]:
import os
import sys
import numpy as np
import pandas as pd
from datetime import datetime as dt
from tqdm import tqdm

# Spark Imports
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import HiveContext
from pyspark import SQLContext

from pyspark.sql import Window
from pyspark.sql import functions as f_
from pyspark.sql.functions import udf
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import lpad
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc, asc
 
os.environ['JAVA_HOME'] = '/usr/java/jdk1.8.0_171-amd64/jre'
os.environ['SPARK_HOME'] = '/opt/cloudera/parcels/SPARK2/lib/spark2'
os.environ['LD_LIBRARY_PATH'] = '/opt/python/virtualenv/jupyter/lib'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/cloudera/parcels/PYENV.ZNO20008661/bin/python'
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/PYENV.ZNO20008661/bin/python'

sys.path.insert(0, os.path.join ('/opt/cloudera/parcels/SPARK2/lib/spark2', 'python'))
 
sys.path.insert(0, os.path.join ('/opt/cloudera/parcels/SPARK2/lib/spark2', 
                                 'python/lib/py4j-0.10.7-src.zip'))
    
# Spark confing  
# formulas
    
allocationResourceStatus = 'false'    
    
# set upS) 
conf = SparkConf().setMaster("yarn-client").\
        setAppName('stg_data_profoling').\
        set('spark.sql.statistics.histogram.enable', 'true').\
        set('spark.dynamicAllocation.enabled', allocationResourceStatus).\
        set('spark.dynamicAllocation.executorIdleTimeout', '1m').\
        set('spark.sql.broadcastTimeout', '3000').\
        set('spark.sql.autoBroadcastJoinThreshold', '-1').\
        set('spark.dynamicAllocation.minExecutors', 20).\
        set('spark.dynamicAllocation.maxExecutors', 100).\
        set('spark.yarn.executor.memoryOverhead', '1g').\
        set('spark.executor.cores', 10).\
        set('spark.driver.memory', '20g').\
        set('spark.executor.memory', '20g').\
        set('mapred.input.dir.recursive','true').\
        set('spark.sql.parquet.binaryAsString','true').\
        set('spark.sql.hive.convertMetastoreParquet','false')
        

In [None]:
sc = SparkContext.getOrCreate(conf=conf)
spark = HiveContext(sc)

In [None]:
# сдлеать функции:
# 0 - типы данных и размер | указать какие воспринимать, как категории! 
# 0.1 - частота данных: сколько есть данных, сколько missing / mismatched
# 1 - распределения (медиана и среднее, понять смещение)
# 1.1 - min, lower quantile, median, upper quantile, maximum (+ top 10 значений) 
# 1.2 сделать статистики (за одно и поймёшь) -> count-min-sketch, T-digest
# 2 - время -> агрегация от часа до дня и получить количество | изменение во времение, сравнение с предыдущей датой
# 3 - рисуем - гистограммы, бины, выбросы




class Profiller:
    
    def __init__(self, spark, dfORsqlString, viz=False, cols=None, ):
        
        from collections import Counter
        
        self.spark = spark
        
        if type(dfORsqlString) == str:
            self.df = (self.spark
                       .sql(dfORsqlString)
                       .persist(pyspark.StorageLevel.MEMORY_ONLY))
        else:
            self.df = dfORsqlString
        
        self.mostFreqItems = self.df.stat.freqItems(cols=self.df.columns).persist(pyspark.StorageLevel.MEMORY_ONLY)
        
        self.countTypes = Counter(i[1] for i in self.df.dtypes)
        
        self.percentile = [25, 50, 75]
        
        self.viz = viz
        
        if cols != None:
            self.calcCols = cols
        else:
            self.calcCols = self.df.columns
        
        # todo - сделать удаление колонок вне категорий
        
    def _groupToDo(self):
        
        # разбить числовые колонки на группы,
        # относительно этих групп и визуализировать
        
        pass
     
        
    def getNumiricalStats(self):
        
        self.res_df = self.df.select(self.calcCols)
        
        # step 1 - basic statistic
        main_result_df = self.res_df.describe()
        
        line = pyspark.sql.Row("summary")
        tmp = spark.createDataFrame([line('count_uniq')])

        # st2
        for cl in self.res_df.columns:
            tmp = tmp.withColumn(cl,
                                 pyspark.sql.functions.lit(self._getUniq(cl)))

        main_result_df = main_result_df.union(tmp).persist(pyspark.StorageLevel.MEMORY_ONLY)
        
        #########################
        # добавляем персентили

        line = pyspark.sql.Row("summary")

        fullDF = None

        for p in self.percentile:
            tmp = spark.createDataFrame([line('{}'.format(p))])

            for cl in self.res_df.columns:

                tmp = tmp.withColumn(cl,
                                     pyspark.sql.functions.lit(self._getPercentiles(cl, p)))

            if fullDF != None:
                fullDF = fullDF.union(tmp)
            else:
                fullDF = tmp

        main_result_df = main_result_df.union(fullDF).persist(pyspark.StorageLevel.MEMORY_ONLY)
      
        
        #########################
        # считаем пропуски
        not_null_col = (
                        self.res_df.agg(*[self._fillingInColCounting(col) for col in self.res_df.columns])
                                   .withColumn('summary', pyspark.sql.functions.lit('is_fill'))
                        )
            
        null_coll = (
                    self.res_df.agg(*[self._NAInColCounting(col) for col in self.res_df.columns])
                          .withColumn('summary', pyspark.sql.functions.lit('is_empty'))
                    )

        tmp = (
               not_null_col.union(null_coll)
                           .select(['summary',] + self.calcCols)    
              )
        
        main_result_df = main_result_df.union(tmp).persist(pyspark.StorageLevel.MEMORY_ONLY)
        
        #########################
        # считаеm % пропусков
        
        dif = main_result_df.filter("summary in ('count', 'is_fill')")

        line = pyspark.sql.Row("summary")

        is_fill = spark.createDataFrame([line('is_fill_prc')])

        for i in dif.columns[1:]:

            is_fill = is_fill.withColumn(i,
                                 pyspark.sql.functions.lit(
                                 dif.withColumn(i, 
                                                pyspark.sql.functions.lit(
                                                pyspark.sql.functions.lag(i, 1).over(Window.partitionBy(i).orderBy(i))
                                                 /
                                                int(dif.filter("summary == 'count'").select(i).collect()[0][0])
                                 )).select(i).collect()[1][0]))


        dif = main_result_df.filter("summary in ('count', 'is_empty')")

        line = pyspark.sql.Row("summary")

        is_empty = spark.createDataFrame([line('is_empty_prc')])

        for i in dif.columns[1:]:

            is_empty = (is_empty.withColumn(i,
                                 pyspark.sql.functions.lit(
                                 dif.withColumn(i, 
                                                pyspark.sql.functions.when(
                                                pyspark.sql.functions.lit(
                                                pyspark.sql.functions.lag(i, 1).over(Window.partitionBy(i).orderBy(i))
                                                 /
                                                int(dif.filter("summary == 'count'").select(i).collect()[0][0])).isNull(), 0)
                                                .otherwise(
                                                pyspark.sql.functions.lit(
                                                pyspark.sql.functions.lag(i, 1).over(Window.partitionBy(i).orderBy(i))
                                                 /
                                                int(dif.filter("summary == 'count'").select(i).collect()[0][0]))
                                                )                                            
                                 ).select(i).collect()[1][0]))
                        )

    
        main_result_df = main_result_df.union(is_fill.union(is_empty)).persist(pyspark.StorageLevel.MEMORY_ONLY)

        
        #########################
        # считаем пропуски пропуски
        # наклон нашего распределения в данных

        line = pyspark.sql.Row("summary")

        tmp = spark.createDataFrame([line('skewness'), line('kurtosis')])

        for cl in self.res_df.columns:

            for d in self._skewness(cl):

                tmp = tmp.withColumn(cl,  pyspark.sql.functions.lit(d))

        main_result_df = main_result_df.union(tmp).persist(pyspark.StorageLevel.MEMORY_ONLY)
        
        
        if self.viz:
            for col in self.calcCols:
                
                              
                x = self.res_df\
                        .select(col)\
                        .filter(pyspark.sql.functions.col(col).isNotNull())\
                        .distinct()\
                        .toPandas()[col].astype('int')
                
                self.numvericalViz(x, col)
                self.distViz(x, col)
        
                self.gridPlotViz(col)
        
        return main_result_df    
    
    
    
    
    # изменение в неисчисляемые типы (категории)
    def _changeToCat(self, df, catList):

        catPatterns = ['string',]

        for i in df.dtypes:
            if i[1] in catPatterns and i[0] not in catList:
                catList.append(i[0])

        for i in catList:

            if i in df.columns:
                df = df.withColumn('{}'.format(i),
                                   pyspark.sql.functions.col('{}'.format(i)).cast(pyspark.sql.types.StringType()))
            else:
                 raise Exception("Name {} does not exist in columns".format(i))

        return df, [i for i in df.columns if i not in catList]


    # не пропуски / пропуски
    def _fillingInColCounting(self, col, nanAsNull = False):
        pred = pyspark.sql.functions.col(col).isNotNull() & (~isnan(col) if nanAsNull else pyspark.sql.functions.lit(True))
        return pyspark.sql.functions.sum(pred.cast(pyspark.sql.types.IntegerType())).alias(col)

    def _NAInColCounting(self, col, nanAsNull = False):
        pred = pyspark.sql.functions.col(col).isNull()
        return pyspark.sql.functions.sum(pred.cast(pyspark.sql.types.IntegerType())).alias(col)
    
    # уникальных значений
    def _getUniq(self, col):
        return int(self.res_df.select(pyspark.sql.functions.countDistinct(col)).collect()[0][0])
    
    # добавим персентили 
    def _getPercentiles(self, col, p):
        #percentile = [25, 50, 75]
        line = pyspark.sql.Row(col)
        perc = np.transpose(np.percentile([float(r[col]) for r in self.res_df.select(col).filter(pyspark.sql.functions.col(col).isNotNull()).collect()], p))
        percsDF = spark.createDataFrame(line(float(perc)), pyspark.sql.types.FloatType())

        return float(percsDF.collect()[0][0])
    
        # смещение (скос) распределения
    def _skewness(self, col):
        return self.res_df.filter(pyspark.sql.functions.col(col).isNotNull())\
                          .select(pyspark.sql.functions.skewness(col),
                                  pyspark.sql.functions.kurtosis(col))\
                          .collect()[0]
            
    def _todo(self):
        from pyspark.ml.stat import ChiSquareTest
        from pyspark.ml.linalg import Vectors
        from pyspark.ml.feature import VectorAssembler
        # todo подготовить фичи в вектор
        # r.pValues
        # r.degreesOfFreedom
        # r.statistics
        
        # some_feature = VectorAssembler(inputCols=[],
                                         #outputCol="")
        # df = some_feature.transform(data)
        
        #r = ChiSquareTest.test(res_df(some_feature?), "feature", "label") #.head()

            
    def corrViz(self):
        
        line = pyspark.sql.Row("stats")

        fullCorrDF = None
        for i in self.calcCols:
            tmp = spark.createDataFrame([line(i)])
            for j in self.calcCols:

                if i == j:
                    tmp = tmp.withColumn(j, 
                                         pyspark.sql.functions.lit(0))
                else:
                    tmp = tmp.withColumn(j, 
                                        pyspark.sql.functions.lit(df.df.stat.corr(col1=i, col2=j)))

            if fullCorrDF != None:
                fullCorrDF = fullCorrDF.union(tmp)

            else:
                fullCorrDF = tmp

        sns.heatmap(fullCorrDF.toPandas().set_index('stats')) 
        
        
    def numvericalViz(self, x, col):
        
        import matplotlib.pyplot as plt
        %matplotlib inline
                
        bins = np.arange(0, x.max(), x.max()/ 10)

        ########################################################################
        hist, bin_edges = np.histogram(x,
                                       bins,
                                       weights=np.zeros_like(x) + 100. / x.size)
        
        fig = plt.figure(figsize=(10, 4))
        ax = fig.add_subplot(1, 2, 1)

        # Plot по высоте значений
        ax.bar(range(len(hist)),hist,width=1,alpha=0.8,ec ='black', color='gold')
        
        # # устанавливаем метки по осям
        ax.set_xticks([0.5+i for i,j in enumerate(hist)])
        
        # устанваливаем определение границ и подписм 
        labels =['{}'.format(int(bins[i+1])) for i,j in enumerate(hist)]
        labels.insert(0,'0')
        ax.set_xticklabels(labels)
        plt.xlabel(col)
        plt.ylabel('percentage')

        ########################################################################
        # % плот

        hist, bin_edges = np.histogram(x,bins) 
        
        
        ax = fig.add_subplot(1, 2, 2)
        
        ax.bar(range(len(hist)),hist,width=1,alpha=0.8,ec ='black', color='gold')

        ax.set_xticks([0.5+i for i,j in enumerate(hist)])

        labels =['{}'.format(int(bins[i+1])) for i,j in enumerate(hist)]
        labels.insert(0,'0')
        ax.set_xticklabels(labels)
        plt.xlabel(col)
        
        plt.ylabel('count')
        plt.suptitle('Histogram of {}: Left with percentage; Right with count'
                     .format(col), size=14)
        plt.show()
        
    
    
    def distViz(self, x, col):
        
        import matplotlib.pyplot as plt
        %matplotlib inline
        import seaborn as sns
        
        fig = plt.figure(figsize=(10, 4))
        ax = fig.add_subplot(1, 2, 1)
        ax = sns.boxplot(data=x)

        ax = fig.add_subplot(1, 2, 2)
        ax = sns.violinplot(data=x)
        
        plt.show()
        
        
    def parter(self, splits):
        patterns = {-float('inf') : "-", float('inf') : "+"}

        bins = dict()

        for n, v in enumerate(splits):
            if n == 0:
                pass
            else:
                bins['{}'.format(n-1)] = "[{}:{}]".format(splits[n-1] if splits[n-1] not in patterns.keys() else patterns[splits[n-1]],
                                                        splits[n] if splits[n] not in patterns.keys() else patterns[splits[n]])
        return bins     
        
        
    def gridPlotViz(self, col):
        
        # backets
        import matplotlib.pyplot as plt
        %matplotlib inline
        import seaborn as sns
        from pyspark.ml.feature import Bucketizer

        mx = int((self.res_df.select(pyspark.sql.functions.max(col)).collect()[0][0]))

        spl = [-float('inf'),]  + list(range(0, mx, int(mx/10))) + [float('inf'),]
        
        bins = self.parter(spl)
        
        

        cnt_bucketizer = Bucketizer(splits=spl, 
                                    inputCol = col,
                                    outputCol = '{}_bucket'.format(col))

        tmp = cnt_bucketizer.setHandleInvalid("keep").transform(self.res_df)
        
        binToString  = udf(lambda x: bins[x], pyspark.sql.types.StringType())
        
       
        #tmp = tmp.withColumn('{}_bucket'.format(col), binToString('{}_bucket'.format(col)))


        sns.set(style="ticks")

        d_f = tmp.na.drop().toPandas().astype('int')
        d_f['{}_bucket'.format(col)] = d_f['{}_bucket'.format(col)].apply(lambda x: bins[str(x)])
        
        d_f[[c for c in d_f.columns if c != '{}_bucket'.format(col)]].astype('int', inplace=True)
        sns.pairplot(d_f, hue='{}_bucket'.format(col))
        plt.show()

       

In [None]:
df = Profiller(spark,
               'SELECT * FROM *',
               True,
              ['rnd', 'cnt_users', 'avg_cltv', 'cnt_opers'])

r = df.getNumiricalStats()
r.show()

In [None]:
df.corrViz()

In [None]:
df.mostFreqItems.show()