In [1]:
from pyspark.sql import SparkSession
import pyspark
import sys

try:
    from StringIO import BytesIO
except ImportError:
    from io import BytesIO

try:
    from urllib import quote
except ImportError:
    from urllib.parse import quote

import base64
from itertools import product

import matplotlib
matplotlib.use('Agg')


#Known imports
import databricks.koalas as ks
from IPython.display import display, HTML
from tqdm import tqdm_notebook as tqdm
import numpy as np
import os
import pandas as pd
import spark_df_profiling.formatters as formatters, spark_df_profiling.templates as templates
from matplotlib import pyplot as plt
from pkg_resources import resource_filename
import six
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import (abs as df_abs, col, count, countDistinct, 
                                   max as df_max, mean, min as df_min,  
                                   sum as df_sum, when,variance, stddev, kurtosis, skewness)

###Spark initialization and configuration
config = pyspark.SparkConf().setAll([(("spark.sql.execution.arrow.enabled", "true")),('spark.executor.memory', '8g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g')])
spark = SparkSession.builder.config(conf=config).master("local[*]").appName("Spark Profiler").getOrCreate()
sc = spark.sparkContext

In [2]:
###Dummy data
def gen_pdf(num_records):
    return pd.DataFrame(np.random.rand(num_records, 2), columns=list("ab"))

def create_sparkFrame():
    pdf = gen_pdf(1000)
    return spark.createDataFrame(pdf)


In [3]:
def process_data(dataFrame):
    table_stats = {"n": a.count()}
    variable_stats=pd.DataFrame()
    for colum in tqdm(a.columns):
        variable_stats=pd.concat([variable_stats,describe_data(a, colum, table_stats["n"]).toPandas()],axis=1)   
    corr_reject=0.1
    table_stats["nvar"] = len(a.columns)
    table_stats["total_missing"] = float(variable_stats.loc["n_missing"].sum()) / (table_stats["n"] * table_stats["nvar"])
    memsize = 0
    table_stats['memsize'] = formatters.fmt_bytesize(memsize)
    table_stats['recordsize'] = formatters.fmt_bytesize(memsize / table_stats['n'])
    typdict={1.0:'NUM',2.0:'CAT'}
    variable_stats.loc['type']=variable_stats.loc['type'].map(typdict)
    table_stats.update({k: 0 for k in ("NUM", "DATE", "CONST", "CAT", "UNIQUE", "CORR")})
    table_stats.update(dict(variable_stats.loc['type'].value_counts()))
    table_stats['REJECTED'] = table_stats['CONST'] + table_stats['CORR']

    description_set=describe_sets(table_stats,variable_stats)
    return description_set


In [174]:
def describe_sets(table_stats,variable_stats): 
    freq_dict = {}
    for var in variable_stats:
        if "value_counts" not in variable_stats[var]:
            pass
        elif not(variable_stats[var]["value_counts"] is np.nan):
            freq_dict[var] = variable_stats[var]["value_counts"]
        else:
            pass
    try:
        variable_stats = variable_stats.drop("value_counts")
    except ValueError:
        pass

    return {'table': table_stats, 'variables': variable_stats.T, 'freq': freq_dict}

In [12]:

def to_html(sample, stats_object):

    """
    Generate a HTML report from summary statistics and a given sample
    Parameters
    ----------
    sample: DataFrame containing the sample you want to print
    stats_object: Dictionary containing summary statistics. Should be generated with an appropriate describe() function

    Returns
    -------
    str, containing profile report in HTML format
    """

    n_obs = stats_object['table']['n']

    value_formatters = formatters.value_formatters
    row_formatters = formatters.row_formatters

    if not isinstance(sample, pd.DataFrame):
        raise TypeError("sample must be of type pandas.DataFrame")

    if not isinstance(stats_object, dict):
        raise TypeError("stats_object must be of type dict. Did you generate this using the spark_df_profiling.describe() function?")

    if set(stats_object.keys()) != {'table', 'variables', 'freq'}:
        raise TypeError("stats_object badly formatted. Did you generate this using the spark_df_profiling-eda.describe() function?")

    def fmt(value, name):
        if pd.isnull(value):
            return ""
        if name in value_formatters:
            return value_formatters[name](value)
        elif isinstance(value, float):
            return value_formatters[formatters.DEFAULT_FLOAT_FORMATTER](value)
        else:
            if sys.version_info.major == 3:
                return str(value)
            else:
                return unicode(value)

    def freq_table(freqtable, n, var_table, table_template, row_template, max_number_of_items_in_table):
        print('Inside Freq')
        local_var_table = var_table.copy()
        freq_other_prefiltered = freqtable["***Other Values***"]
        freq_other_prefiltered_num = freqtable["***Other Values Distinct Count***"]
        freqtable = freqtable.drop(["***Other Values***", "***Other Values Distinct Count***"])

        freq_rows_html = u''

        freq_other = sum(freqtable[max_number_of_items_in_table:]) + freq_other_prefiltered
        freq_missing = var_table["n_missing"]
        max_freq = max(freqtable.values[0], freq_other, freq_missing)
        try:
            min_freq = freqtable.values[max_number_of_items_in_table]
        except IndexError:
            min_freq = 0

        # TODO: Correctly sort missing and other

        def format_row(freq, label, extra_class=''):
            width = int(freq / float(max_freq) * 99) + 1
            if width > 20:
                label_in_bar = freq
                label_after_bar = ""
            else:
                label_in_bar = "&nbsp;"
                label_after_bar = freq

            return row_template.render(label=label,
                                       width=width,
                                       count=freq,
                                       percentage='{:2.1f}'.format(freq / float(n) * 100),
                                       extra_class=extra_class,
                                       label_in_bar=label_in_bar,
                                       label_after_bar=label_after_bar)

        for label, freq in six.iteritems(freqtable[0:max_number_of_items_in_table]):
            freq_rows_html += format_row(freq, label)

        if freq_other > min_freq:
            freq_rows_html += format_row(freq_other,
                                         "Other values (%s)" % (freqtable.count() 
                                                                + freq_other_prefiltered_num 
                                                                - max_number_of_items_in_table),
                                         extra_class='other')

        if freq_missing > min_freq:
            freq_rows_html += format_row(freq_missing, "(Missing)", extra_class='missing')

        return table_template.render(rows=freq_rows_html, varid=hash(idx))

    # Variables
    rows_html = u""
    messages = []

    for idx, row in stats_object['variables'].iterrows():

        formatted_values = {'varname': idx, 'varid': hash(idx)}
        row_classes = {}

        for col, value in six.iteritems(row):
            formatted_values[col] = fmt(value, col)

        for col in set(row.index) & six.viewkeys(row_formatters):
            row_classes[col] = row_formatters[col](row[col])
            if row_classes[col] == "alert" and col in templates.messages:
                messages.append(templates.messages[col].format(formatted_values, varname = formatters.fmt_varname(idx)))

                
                
        if row['type'] == 'CAT':
            print( n_obs)
            print(stats_object['freq'])
            print(stats_object['variables'].ix[idx])
            formatted_values['minifreqtable'] = freq_table(stats_object['freq'][idx], n_obs, stats_object['variables'].ix[idx],
                                                           templates.template('mini_freq_table'), templates.template('mini_freq_table_row'), 3)
            #formatted_values['freqtable'] = freq_table(stats_object['freq'][idx], n_obs, stats_object['variables'].ix[idx],
            #                                           templates.template('freq_table'), templates.template('freq_table_row'), 20)
            if row['distinct_count'] > 50:
                messages.append(templates.messages['HIGH_CARDINALITY'].format(formatted_values, varname = formatters.fmt_varname(idx)))
                row_classes['distinct_count'] = "alert"
            else:
                row_classes['distinct_count'] = ""

        if row['type'] == 'UNIQUE':
            obs = stats_object['freq'][idx].index

            formatted_values['firstn'] = pd.DataFrame(obs[0:3], columns=["First 3 values"]).to_html(classes="example_values", index=False)
            formatted_values['lastn'] = pd.DataFrame(obs[-3:], columns=["Last 3 values"]).to_html(classes="example_values", index=False)

            if n_obs > 40:
                formatted_values['firstn_expanded'] = pd.DataFrame(obs[0:20], index=range(1, 21)).to_html(classes="sample table table-hover", header=False)
                formatted_values['lastn_expanded'] = pd.DataFrame(obs[-20:], index=range(n_obs - 20 + 1, n_obs+1)).to_html(classes="sample table table-hover", header=False)
            else:
                formatted_values['firstn_expanded'] = pd.DataFrame(obs, index=range(1, n_obs+1)).to_html(classes="sample table table-hover", header=False)
                formatted_values['lastn_expanded'] = ''

        rows_html += templates.row_templates_dict[row['type']].render(values=formatted_values, row_classes=row_classes)

        if row['type'] in {'CORR', 'CONST'}:
            formatted_values['varname'] = formatters.fmt_varname(idx)
            messages.append(templates.messages[row['type']].format(formatted_values))


    # Overview
    formatted_values = {k: fmt(v, k) for k, v in six.iteritems(stats_object['table'])}

    row_classes={}
    for col in six.viewkeys(stats_object['table']) & six.viewkeys(row_formatters):
        row_classes[col] = row_formatters[col](stats_object['table'][col])
        if row_classes[col] == "alert" and col in templates.messages:
            messages.append(templates.messages[col].format(formatted_values, varname = formatters.fmt_varname(idx)))

    messages_html = u''
    for msg in messages:
        messages_html += templates.message_row.format(message=msg)

    overview_html = templates.template('overview').render(values=formatted_values, row_classes = row_classes, messages=messages_html)

    # Sample

    sample_html = templates.template('sample').render(sample_table_html=sample.to_html(classes="sample"))
    # TODO: should be done in the template
    return templates.template('base').render({'overview_html': overview_html, 'rows_html': rows_html, 'sample_html': sample_html})

In [13]:
def custom_output(html):
    chart = HTML(html)
    display(chart)

In [160]:
pdf=pd.read_csv('Book1.csv')
a=spark.createDataFrame(pdf)
a

DataFrame[Index: bigint, Section: string]

In [176]:
a=create_sparkFrame()
description_set=process_data(a)
sample = a.limit(5).toPandas()
html = to_html(sample,description_set)
htmlFinal=templates.template('wrapper').render(content=html)
custom_output(htmlFinal)

HBox(children=(IntProgress(value=0, max=2), HTML(value='')))

   distinct_count  count      mean       min       max  variance  kurtosis       std  skewness         sum  type  top  freq  value_counts  p_unique  is_unique  n_missing  p_missing  p_infinite  n_infinite     range        cv  n_zeros  p_zeros  memorysize
a            1000   1000  0.495149  0.000761  0.996708  0.083714 -1.188574  0.289334  0.067275  495.148644     1  NaN   NaN           NaN       1.0       True          0        0.0           0           0  0.995948  0.584337        0      0.0           0
   distinct_count  count      mean       min       max  variance  kurtosis       std  skewness         sum  type  top  freq  value_counts  p_unique  is_unique  n_missing  p_missing  p_infinite  n_infinite     range        cv  n_zeros  p_zeros  memorysize
b            1000   1000  0.496804  0.000668  0.997083  0.082558 -1.187927  0.287329  0.052608  496.803642     1  NaN   NaN           NaN       1.0       True          0        0.0           0           0  0.996415  0.578355        0  

0,1
Number of variables,2
Number of observations,1000
Total Missing (%),0.0%
Total size in memory,0.0 B
Average record size in memory,0.0 B

0,1
Numeric,2
Categorical,0
Date,0
Text (Unique),0
Rejected,0

0,1
Distinct count,1000
Unique (%),100.0%
Missing (%),0.0%
Missing (n),0
Infinite (%),0.0%
Infinite (n),0

0,1
Mean,0.49515
Minimum,0.00076068
Maximum,0.99671
Zeros (%),0.0%

0,1
Minimum,0.00076068
5-th percentile,
Q1,
Median,
Q3,
95-th percentile,
Maximum,0.99671
Range,0.99595
Interquartile range,

0,1
Standard deviation,0.28933
Coef of variation,0.58434
Kurtosis,-1.1886
Mean,0.49515
MAD,
Skewness,0.067275
Sum,495.15
Variance,0.083714
Memory size,0.0 B

0,1
Distinct count,1000
Unique (%),100.0%
Missing (%),0.0%
Missing (n),0
Infinite (%),0.0%
Infinite (n),0

0,1
Mean,0.4968
Minimum,0.00066813
Maximum,0.99708
Zeros (%),0.0%

0,1
Minimum,0.00066813
5-th percentile,
Q1,
Median,
Q3,
95-th percentile,
Maximum,0.99708
Range,0.99641
Interquartile range,

0,1
Standard deviation,0.28733
Coef of variation,0.57836
Kurtosis,-1.1879
Mean,0.4968
MAD,
Skewness,0.052608
Sum,496.8
Variance,0.082558
Memory size,0.0 B

Unnamed: 0,a,b
0,0.190032,0.672502
1,0.754037,0.165219
2,0.636746,0.764507
3,0.167526,0.075187
4,0.980287,0.916459


In [171]:
 def describe_data(df, column, nrows):
        column_type = df.select(column).dtypes[0][1]
        if ("array" in column_type) or ("stuct" in column_type) or ("map" in column_type):
            raise NotImplementedError("Column {c} is of type {t} and cannot be analyzed".format(c=column, t=column_type))
        
        if column_type in {"tinyint", "smallint", "int", "bigint", "float", "double", "decimal"}:
            results_data=results_data=ks.DataFrame  (
            {'distinct_count': df.select(column).agg(countDistinct(col(column))).collect()[0][0],
             'count': df.select(column).na.drop().select(count(col(column))).collect()[0][0],
             'mean' : df.select(column).na.drop().agg(mean(col(column))).collect()[0][0],
             'min': df.select(column).na.drop().agg(df_min(col(column))).collect()[0][0],
             "max": df.select(column).na.drop().agg(df_max(col(column))).collect()[0][0],
             "variance": df.select(column).na.drop().agg(variance(col(column))).collect()[0][0],
             "kurtosis" : df.select(column).na.drop().agg(kurtosis(col(column))).collect()[0][0],
             "std"    : df.select(column).na.drop().agg(stddev(col(column))).collect()[0][0],
             "skewness" : df.select(column).na.drop().agg(skewness(col(column))).collect()[0][0],
             "sum"     : df.select(column).na.drop().agg(df_sum(col(column))).collect()[0][0],
             "type" : 1,
             "top" : np.nan,
             "freq" :np.nan,
             "value_counts" : np.nan },index=[column])  
        elif column_type in {'string'}:
            results_data=results_data=ks.DataFrame  (
            {'distinct_count': df.select(column).agg(countDistinct(col(column))).collect()[0][0],
             'count': df.select(column).na.drop().select(count(col(column))).collect()[0][0],
             'mean' : np.nan,
             'min': np.nan,
             "max": np.nan,
             "variance": np.nan,
             "kurtosis" : np.nan,
             "std"    : np.nan,
             "skewness" : np.nan,
             "sum"     : np.nan,
             "type" : 2 },index=[column])
            results_data=ks.concat([results_data,category(df, column, nrows)])
        else:
            pass
            
        results_data["p_unique"] = results_data["distinct_count"] / results_data["count"]
        results_data["is_unique"] = results_data["distinct_count"] == nrows
        results_data["n_missing"] = nrows - results_data["count"]
        results_data["p_missing"] = results_data["n_missing"] / nrows
        results_data["p_infinite"] = 0
        results_data["n_infinite"] = 0
        results_data["range"] = results_data["max"] - results_data["min"]
        results_data["cv"] = results_data["std"] / results_data["mean"]
        results_data['n_zeros'] = df.select(column).where(col(column)==0.0).count()
        results_data['p_zeros'] = results_data['n_zeros'] / nrows
        result = results_data.iloc[:1,:]
        result["memorysize"] = 0
        
        print(result)
                
        if (result["count"] > result["distinct_count"] > 1):
            try:
                result["mode"] = result["top"]
            except KeyError:
                result["mode"] = 0
        else:
            try:
                result["mode"] = result["value_counts"].index[0]
            except KeyError:
                result["mode"] = 0
            
            except IndexError:
                result["mode"] = "MISSING" 
                   
        return result.iloc[:1,:].T

### Backup

In [172]:
def category(df, column, nrows):   
    
    
        
        value_counts = (df.select(column).na.drop()
                        .groupBy(column)
                        .agg(count(col(column)))
                        .orderBy("count({c})".format(c=column),ascending=False)
                       ).cache()
        
        stats = (value_counts
                 .limit(1)
                 .withColumnRenamed(column, "top")
                 .withColumnRenamed("count({c})".format(c=column), "freq")
                ).to_koalas()
        
       
        top_50 = value_counts.limit(50).to_koalas()
        top_50_categories = value_counts.limit(50).to_koalas()[column].tolist()                
        top = top_50.set_index(column)["count({c})".format(c=column)]
        stats["value_counts"]=''
        return stats

In [23]:
def category(df, column, nrows):        
        value_counts = (df.select(column).na.drop()
                        .groupBy(column)
                        .agg(count(col(column)))
                        .orderBy("count({c})".format(c=column),ascending=False)
                       ).cache()
        
        # Get the most frequent class:
        stats = (value_counts
                 .limit(1)
                 .withColumnRenamed(column, "top")
                 .withColumnRenamed("count({c})".format(c=column), "freq")
                ).toPandas().ix[0]
        
        top_50 = value_counts.limit(50).toPandas().sort_values("count({c})".format(c=column),
                                                               ascending=False)
        top_50_categories = top_50[column].values.tolist()

        others_count = pd.Series([df.select(column).na.drop()
                        .where(~(col(column).isin(*top_50_categories)))
                        .count()
                        ], index=["***Other Values***"])
        others_distinct_count = pd.Series([value_counts
                                .where(~(col(column).isin(*top_50_categories)))
                                .count()
                                ], index=["***Other Values Distinct Count***"])

        top = top_50.set_index(column)["count({c})".format(c=column)]
        top = top.append(others_count)
        top = top.append(others_distinct_count)
        stats["value_counts"] = top
        stats["type"] = "CAT"
        value_counts.unpersist()
        return ks.from_pandas(stats)