In this notebook we aim to create a general spark script to perform initial table profiling. the tasks involve:

- ### Add the data as RDD
    - multiple formats: csv, delta, text, etc.
- ### Overal dataset information
    - Number of Variables
    - Number of records
    - total missing percentage
    - Total size in memory
    - Average record size in memory
    - Recomended partition size during spark analysis
    - Variable analysis:
        - Number of categorical
        - Number of Numeric
        - Number of Date
        - Number of Text(unique)
        - Number of rejected
    - some overal info about the dataset like:
        - GeoLocation has 7315 / 19.0% missing values Missing
        - GeoLocation has a high cardinality: 17100 distinct values Warning
        - mass_g is highly skewed (γ1 = 76.916)
        - recclass has a high cardinality: 466 distinct values Warning
        - reclat has 7315 / 19.0% missing values Missing
        - reclat has 6438 / 14.1% zeros
        - reclat_city is highly correlated with reclat (ρ = 0.99423) Rejected
        - reclong has 7315 / 19.0% missing values Missing
        - reclong has 6214 / 13.6% zeros
        - source has constant value NASA Rejected
- ### Create a table with the column names and Analysis of each (D) 
    - Devide columns into numeric vs none-nummeric
    - some of the features for nun-numeric:
        - Record count
        - Unique Values
        - Empty Strings
        - Null Values
        - Percent Fill
        - Percent Numeric
        - Max Length
        - if float,int 
            - max_value
            - min_value
            - mean
            - std
        - if string
            - shortest value 
            - longest value
            - average length
        - if date time
            - min_date
            - max_date
    - include some graphs for each (seperate numeric vs categorical)

- create a file_to_be_profiled, that can handle csv,tsv,delta,gz data formats and in a for loop profile all
- Create a front-end for the results using html maybe connect to a react front-end


- The codebase has refferences to other repositories like: [this](https://github.com/gandalf1819/NYCOpenData-Profiling-Analysis/blob/master/Task-1-Generic-profiling.py) and [this](https://github.com/pandas-profiling/pandas-profiling)

### Add Libraries

In [1]:
import os
import sys

In [2]:
# Pyspark sessions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as D

#### Create spark session

In [4]:
spark = SparkSession.builder.appName('SparkProfiling').getOrCreate()
sc = spark.sparkContext                # if you need the sparkConext
spark

In [112]:
# If folder 'Result' does not exist, create one
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
create_dir('../Result') 

In [115]:
# List data inputs ready to use
def get_data_input_list(path):
    return os.listdir(path)
files = get_data_input_list('../Data')
profiling_input = files[0]             # choose the input file you want
profiling_input

'data_paper_sample_10k.csv'

### Helper Functions

In [8]:
def count_not_null(c, nan_as_null=False):
    pred = F.col(c).isNotNull() & (~isnan(c) if nan_as_null else F.lit(True))
    return F.sum(pred.cast('integer')).alias(c)

In [81]:
def get_top_five_frequent_record(DF, col):
    frequency_dataframe=DF.groupBy(col).count().sort(F.desc('count'))
    frequency_dataframe=frequency_dataframe.where(F.col(col).isNotNull())
    top_frequency_five=[]
    if frequency_dataframe.count()<5:
        top_frequency_five=[row[0] for row in frequency_dataframe.collect()]
    else:
        top_frequency_five=[row[0] for row in frequency_dataframe.take(5)]
    return top_frequency_five

In [199]:
# This method, randomly samples 10% of the data and cache the samples to find the size. This method is not acurate and can be costly
def estimate_rdd_memory_size_mb(df):
    estimated_df = df.sample(fraction = 0.1)
    estimated_df.cache().foreach(lambda x: x)
    catalyst_plan = estimated_df._jdf.queryExecution().logical()
    test_kb = spark._jsparkSession.sessionState().executePlan(catalyst_plan).optimizedPlan().stats().sizeInBytes()
    return test_kb * 10 / (1024 * 1024)


### Type cast validations

In [86]:
def validate_string_to_integer(d):
    if type(d)==str:
        try:
            z=int(d)
            return z
        except:
            return None
    else:
        return None
    
def validate_string_to_float(d):
    if type(d)==str:
        try:
            z=float(d)
            return z
        except:
            return None
    else:
        return None

def validate_date(d):
    try:
        z=parse(d)
        return str(z)
    except:
        return None

### Read the DataFrame

In [116]:
# read the dataframe
filepath='../Data/'+profiling_input
DF = spark.read.format('csv').options(header='true',inferschema='true').load(filepath)
DF.show(5)

+---+------+--------------------+--------------------+----+----------+----------+--------+----------+---------+------+-----+----+--------------------+--------------------+--------------------+--------------------+
|_c0|    id|             authors|               title|year|n_citation|page_start|page_end|  doc_type|publisher|volume|issue| doi|          references|                 fos|               venue|    indexed_abstract|
+---+------+--------------------+--------------------+----+----------+----------+--------+----------+---------+------+-----+----+--------------------+--------------------+--------------------+--------------------+
|  0| 75067|[{'name': 'Narcis...|Comparing Estimat...|2001|         0|       348|     353|Conference|     IIIS|  null| null|null|[1989212489, 2007...|[{'name': 'Softwa...|{'raw': 'World Mu...|                null|
|  1| 40449|[{'name': 'José L...|Software Workbenc...|2001|         1|       517|     522|      null|     null|  null| null|null|               

In [10]:
columns_names = DF.columns
columns_names

['_c0',
 'id',
 'authors',
 'title',
 'year',
 'n_citation',
 'page_start',
 'page_end',
 'doc_type',
 'publisher',
 'volume',
 'issue',
 'doi',
 'references',
 'fos',
 'venue',
 'indexed_abstract']

In [11]:
total_rows = DF.count()
total_rows

291

In [187]:
# general database analysis
general_df_info = {}

general_df_info["total_records"] = total_rows
general_df_info["total_variables"] = len(columns_names)
general_df_info["total_missing_percentage"] = sum(compute_null_columns) / (general_df_info["total_records"] * general_df_info["total_variables"])
general_df_info["estimate_size_in_memory_Mb"] = estimate_rdd_memory_size_mb(DF)
general_df_info["average_record_size_in_memory_Mb"] = general_df_info["estimate_size_in_memory_Mb"] / total_rows

variable_analysis = {}
variable_analysis['Numeric'] = len([item[0] for item in DF.dtypes if (item[1].startswith('int') or item[1].startswith('float'))])
variable_analysis['Categorical'] = len([item[0] for item in DF.dtypes if item[1].startswith('string')])
variable_analysis['Date'] = len([item[0] for item in DF.dtypes if item[1].startswith('date')])

In [188]:
general_df_info

{'total_records': 291,
 'total_variables': 17,
 'total_missing_percentage': 0.30402263998382856,
 'estimate_size_in_memory_Mb': 0.4421710968017578,
 'average_record_size_in_memory_Mb': 0.0015194883051606798}

In [189]:
variable_analysis

{'Numeric': 2, 'Categorical': 15, 'Date': 0}

## Compute column related analysis

In [79]:
compute_unique_values = DF.agg(*[F.countDistinct(F.col(c)) for c in DF.columns]).collect()[0]
compute_unique_values

291

In [12]:
compute_not_null_columns = DF.agg(*[count_not_null(c) for c in DF.columns]).collect()[0]
compute_not_null_columns

Row(_c0=291, id=291, authors=291, title=291, year=291, n_citation=291, page_start=244, page_end=222, doc_type=200, publisher=89, volume=78, issue=31, doi=42, references=106, fos=281, venue=287, indexed_abstract=117)

In [57]:
compute_null_columns=[(total_rows-count_notNull) for count_notNull in compute_not_null_columns]
compute_null_columns

[0, 0, 0, 0, 0, 0, 47, 69, 91, 202, 213, 260, 249, 185, 10, 4, 174]

In [60]:
compute_null_proportion=[round((count_Nulls / total_rows)*100, 3) for count_Nulls in compute_null_columns]
compute_null_proportion

[0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 16.151,
 23.711,
 31.271,
 69.416,
 73.196,
 89.347,
 85.567,
 63.574,
 3.436,
 1.375,
 59.794]

### Create UDF Functions for type casting

In [89]:
get_int=F.udf(lambda x: x if type(x)==int else None, D.IntegerType())
get_str=F.udf(lambda x: x if type(x)==str else None, D.StringType())
get_flt=F.udf(lambda x: x if type(x)==float else None, D.FloatType())
get_dt=F.udf(lambda x: validate_date(x), D.StringType())
get_string_int=F.udf(lambda x: validate_string_to_integer(x), D.IntegerType())
get_string_flt=F.udf(lambda x: validate_string_to_float(x), D.FloatType())

### Column wide analysis

In [104]:
cols_data=[]

for i, cols in enumerate(DF.columns):
    if total_rows==0:
        continue
    columns_data={}
    columns_data['column_name']=cols
    columns_data['dtype']= DF.dtypes[i][1]
    columns_data['record_count']=total_rows
    columns_data['unique_values']=compute_unique_values[i]
    columns_data['number_non_empty_cells']=compute_not_null_columns[i]
    columns_data['number_empty_cells']=compute_null_columns[i]
    columns_data['null_proportion']=compute_null_proportion[i]
    columns_data['top_five_value'] = get_top_five_frequent_record(DF, cols)
    
    
    # Data type specific analysis
    int_col=cols+' '+'int_type'
    str_col=cols+' '+'str_type'
    float_col=cols+ ' '+ 'float_type'
    date_col=cols+' '+'date_type'
    str_int_col=cols + ' '+'str_int'
    str_float_col=cols +' '+'str_float'
    
    df=DF.select([get_int(cols).alias(int_col), 
                  get_str(cols).alias(str_col), 
                  get_flt(cols).alias(float_col), 
                  get_dt(cols).alias(date_col),
                  get_string_int(cols).alias(str_int_col),
                  get_string_flt(cols).alias(str_float_col)
                 ])
    
    int_df = df.select(int_col).where(F.col(int_col).isNotNull())
    str_df = df.select(str_col).where(F.col(str_col).isNotNull())
    float_df = df.select(float_col).where(F.col(float_col).isNotNull())
    date_df = df.select(date_col).where(F.col(date_col).isNotNull())
    str_int_df = df.select(str_int_col).where(F.col(str_int_col).isNotNull())
    str_float_df = df.select(str_float_col).where(F.col(str_float_col).isNotNull())
    
    columns_data['data_types']=[]
    
    if float_df.count()>1:
        type_data={}
        type_data['type']='REAL'
        type_data['count']=float_df.count()
        type_data['max_value']=float_df.agg({float_col: "max"}).collect()[0][0]
        type_data['min_value']=float_df.agg({float_col: "min"}).collect()[0][0]
        type_data['mean']=float_df.agg({float_col: "avg"}).collect()[0][0]
        type_data['stddev']=float_df.agg({float_col: 'stddev'}).collect()[0][0]
        columns_data['data_types'].append(type_data)

    if int_df.count()>1:
        type_data={}
        type_data['type']='INTEGER (LONG)'
        type_data['count']=int_df.count()
        type_data['max_value']=int_df.agg({int_col: 'max'}).collect()[0][0]
        type_data['min_value']=int_df.agg({int_col: 'min'}).collect()[0][0]
        type_data['mean']=int_df.agg({int_col: 'avg'}).collect()[0][0]
        type_data['stddev']=int_df.agg({int_col: 'stddev'}).collect()[0][0]
        columns_data['data_types'].append(type_data)

    if str_df.count()>1:
        type_data={'type':'TEXT', 'count': str_df.count()}
        str_rows=str_df.distinct().collect()
        str_arr=[row[0] for row in str_rows]
        if len(str_arr)<=5:
            type_data['shortest_values']=str_arr
            type_data['longest_values']=str_arr

        else:
            str_arr.sort(key=len, reverse=True)
            type_data['shortest_values']=str_arr[-5:]
            type_data['longest_values']=str_arr[:5]

        type_data['average_length']=sum(map(len, str_arr))/len(str_arr)
        columns_data['data_types'].append(type_data)

    if date_df.count()>1:
        type_data={"type":"DATE/TIME", "count":date_df.count()}
        min_date, max_date = date_df.select(F.min(date_col), F.max(date_col)).first()
        type_data['max_value']=max_date
        type_data['min_value']=min_date
        columns_data['data_types'].append(type_data)

    if str_float_df.count()>1:
        type_data={}
        type_data['type']='REAL'
        type_data['count']=str_float_df.count()
        type_data['max_value']=str_float_df.agg({str_float_col: "max"}).collect()[0][0]
        type_data['min_value']=str_float_df.agg({str_float_col: "min"}).collect()[0][0]
        type_data['mean']=str_float_df.agg({str_float_col: "avg"}).collect()[0][0]
        type_data['stddev']=str_float_df.agg({str_float_col: 'stddev'}).collect()[0][0]
        columns_data['data_types'].append(type_data)

    if str_int_df.count()>1:
        type_data={}
        type_data['type']='INTEGER (LONG)'
        type_data['count']=str_int_df.count()
        type_data['max_value']=str_int_df.agg({str_int_col: 'max'}).collect()[0][0]
        type_data['min_value']=str_int_df.agg({str_int_col: 'min'}).collect()[0][0]
        type_data['mean']=str_int_df.agg({str_int_col: 'avg'}).collect()[0][0]
        type_data['stddev']=str_int_df.agg({str_int_col: 'stddev'}).collect()[0][0]
        columns_data['data_types'].append(type_data)
    cols_data.append(columns_data)


In [105]:
# The output in Json Format
cols_data

[{'column_name': '_c0',
  'dtype': 'int',
  'record_count': 291,
  'unique_values': 291,
  'number_non_empty_cells': 291,
  'number_empty_cells': 0,
  'null_proportion': 0.0,
  'top_five_value': [44, 159, 192, 271, 31],
  'data_types': [{'type': 'INTEGER (LONG)',
    'count': 291,
    'max_value': 290,
    'min_value': 0,
    'mean': 145.0,
    'stddev': 84.1486779456457}]},
 {'column_name': 'id',
  'dtype': 'int',
  'record_count': 291,
  'unique_values': 291,
  'number_non_empty_cells': 291,
  'number_empty_cells': 0,
  'null_proportion': 0.0,
  'top_five_value': [73220, 136660, 2067561, 5140417, 8232766],
  'data_types': [{'type': 'INTEGER (LONG)',
    'count': 291,
    'max_value': 8870360,
    'min_value': 40449,
    'mean': 4705529.972508591,
    'stddev': 2536564.402568419}]},
 {'column_name': 'authors',
  'dtype': 'string',
  'record_count': 291,
  'unique_values': 291,
  'number_non_empty_cells': 291,
  'number_empty_cells': 0,
  'null_proportion': 0.0,
  'top_five_value': ["[

In [106]:
# Turn Json to spark df
spark_df = spark.read.json(sc.parallelize(cols_data))
spark_df.show()

+---------------+----------------+--------------------+------+---------------+------------------+----------------------+------------+--------------------+-------------+
|_corrupt_record|     column_name|          data_types| dtype|null_proportion|number_empty_cells|number_non_empty_cells|record_count|      top_five_value|unique_values|
+---------------+----------------+--------------------+------+---------------+------------------+----------------------+------------+--------------------+-------------+
|           null|             _c0|[{null, 291, null...|   int|            0.0|                 0|                   291|         291|[44, 159, 192, 27...|          291|
|           null|              id|[{null, 291, null...|   int|            0.0|                 0|                   291|         291|[73220, 136660, 2...|          291|
|           null|         authors|[{145.32989690721...|string|            0.0|                 0|                   291|         291|[[{'name': 'Thier...| 

In [195]:
spark_df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- column_name: string (nullable = true)
 |-- data_types: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- average_length: double (nullable = true)
 |    |    |-- count: long (nullable = true)
 |    |    |-- longest_values: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- max_value: double (nullable = true)
 |    |    |-- mean: double (nullable = true)
 |    |    |-- min_value: double (nullable = true)
 |    |    |-- shortest_values: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- stddev: double (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- dtype: string (nullable = true)
 |-- null_proportion: double (nullable = true)
 |-- number_empty_cells: long (nullable = true)
 |-- number_non_empty_cells: long (nullable = true)
 |-- record_count: long (nullable = true)
 |-- top_five_value:

In [213]:
data_exploded = spark_df.select('column_name', 
                                'dtype', 
                                'record_count',
                                'number_non_empty_cells',
                                'number_empty_cells',
                                'null_proportion',
                                'unique_values',
                                'top_five_value',
                                F.explode('data_types').alias('data_types')
                               )
data_exploded = data_exploded.select('column_name', 
                                     'dtype',
                                     'record_count',
                                     'number_non_empty_cells',
                                     'number_empty_cells',
                                     'null_proportion',
                                     'unique_values',
                                     'top_five_value', 
                                     'data_types.max_value', 
                                     'data_types.min_value',
                                     'data_types.mean',
                                     'data_types.stddev',
                                     'data_types.count',
                                     'data_types.longest_values',
                                     'data_types.shortest_values',
                                     'data_types.average_length',
                                     'data_types.type',
                                    )           
data_exploded.show()

+-----------+------+------------+----------------------+------------------+---------------+-------------+--------------------+---------+---------+------------------+------------------+-----+--------------------+--------------------+------------------+--------------+
|column_name| dtype|record_count|number_non_empty_cells|number_empty_cells|null_proportion|unique_values|      top_five_value|max_value|min_value|              mean|            stddev|count|      longest_values|     shortest_values|    average_length|          type|
+-----------+------+------------+----------------------+------------------+---------------+-------------+--------------------+---------+---------+------------------+------------------+-----+--------------------+--------------------+------------------+--------------+
|        _c0|   int|         291|                   291|                 0|            0.0|          291|[44, 159, 192, 27...|    290.0|      0.0|             145.0|  84.1486779456457|  291|         

In [214]:
pandas_df = data_exploded.toPandas()
pandas_df = pandas_df.set_index("column_name")
# pandas_df = pandas_df[['dtype','record_count','number_non_empty_cells','number_empty_cells','null_proportion','unique_values','top_five_value','data_types']]
pandas_df

Unnamed: 0_level_0,dtype,record_count,number_non_empty_cells,number_empty_cells,null_proportion,unique_values,top_five_value,max_value,min_value,mean,stddev,count,longest_values,shortest_values,average_length,type
column_name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
_c0,int,291,291,0,0.0,291,"[44, 159, 192, 271, 31]",290.0,0.0,145.0,84.14868,291,,,,INTEGER (LONG)
id,int,291,291,0,0.0,291,"[73220, 136660, 2067561, 5140417, 8232766]",8870360.0,40449.0,4705530.0,2536564.0,291,,,,INTEGER (LONG)
authors,string,291,291,0,0.0,291,"[[{'name': 'Thierry Despeyroux', 'id': 2721087...",,,,,291,"[[{'name': 'Érika Cota', 'org': 'PPGC---Inst. ...","[[{'name': 'Li Gong', 'id': 2479801864}], [{'n...",145.329897,TEXT
title,string,291,291,0,0.0,291,[ESL/EFL Websites: What Should the Teachers an...,,,,,291,[A Study of the Experimental Validation of Fau...,"[ 'id': 2155134101}]"", 'id': 98411351}]"", 'i...",61.487973,TEXT
year,string,291,291,0,0.0,12,"[2001, Irvine', Nanjing, {'name': 'Fanny Wa...",,,,,291,[Evaluation of an Automatically Obtained Shape...,"[ 'id': 2661349709}, Villetaneuse, Irvine', ...",30.166667,TEXT
year,string,291,291,0,0.0,12,"[2001, Irvine', Nanjing, {'name': 'Fanny Wa...",2001.0,2001.0,2001.0,0.0,280,,,,REAL
year,string,291,291,0,0.0,12,"[2001, Irvine', Nanjing, {'name': 'Fanny Wa...",2001.0,2001.0,2001.0,0.0,280,,,,INTEGER (LONG)
n_citation,string,291,291,0,0.0,46,"[0, 1, 2, 4, 3]",,,,,291,"[ People's Republic of China#TAB#"""", Dual Pert...","[6, 9, 1, 4, 2]",5.26087,TEXT
n_citation,string,291,291,0,0.0,46,"[0, 1, 2, 4, 3]",2001.0,0.0,38.10915,242.4903,284,,,,REAL
n_citation,string,291,291,0,0.0,46,"[0, 1, 2, 4, 3]",2001.0,0.0,38.10915,242.4903,284,,,,INTEGER (LONG)


## TODO:
- create front-end for the results using json and html template building
- save report as csv or pdf
- perform relational analysis: correl , cordinality, skewness, etc.