In [1]:
import pandas_profiling
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pandas as pd

spark = SparkSession\
.builder\
.master('local')\
.appName('Capstone_Project')\
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3")\
.getOrCreate()

spark

In [2]:
#Dependencies
import pandas as pd
import time
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import TimestampType
import pandas_profiling as pdf

In [3]:
#####Spark dataframe
testing_df = spark.read.csv(r'C:\Users\luisj\Documents\Project Folders\Pandas_profiling\testing_lg.csv', header=True)

## Spark describe function:
- This function gives you a count, min, max and standar deviation at a column level. 
- This might be useful if we format the information to add into the profiling_table in hive as it runs min, max and count in one line of code.
- We can select just the columns that we require i.e. scr_sys and business_date
- The table below is the output after running it in a table of 100 rows with random data

In [23]:
####spark describe
profilingt_df = testing_df.describe().toPandas()
profilingt_df

Unnamed: 0,summary,p_id,p_inst_id,p_src_id,a_id,a_inst_id,a_src_id,p_a_role_cd,p_a_strt_dt,m_ch,c_flg,e_fd_r_id,p_a_role,p_a_shrt_nm,d_mail_p_id,d_mail_p_cd,e_b_day,s_sys_id,s_sys_inst_id
0,count,99.0,99,99,99.0,99,99,99,99.0,99,99,99.0,99.0,99.0,99.0,99.0,99,99,99
1,mean,582730122.7676767,,,538765920.4040405,,,,,,,1234567.0,,,,,,,
2,stddev,243733572.4680058,,,258629377.9107589,,,,,,,0.0,,,,,,,
3,min,123603164.0,ABC,DEF,112815503.0,ABC,SRC,OOO,,AC8720717025944,N,1234567.0,,,,,05/05/2019,AAA,ABC
4,max,986873542.0,ABC,DEF,998871315.0,ABC,SRC,OWN,,ZY1868772203810,N,1234567.0,,,,,20/01/2019,CCC,ABC


## Pandas describe function:
- The pandas describe function gives you the count, unique, top and frequency at a column level
- Testing is required to understand pandas performance due that Pandas is not optimize for parallel processing with Hive as Spark is, therefore running out of memory while executing the method or storing a larga dataframe in memory is at risk.

In [24]:
pd_testing_df = testing_df.toPandas()
pd_testing_df.describe()

Unnamed: 0,p_id,p_inst_id,p_src_id,a_id,a_inst_id,a_src_id,p_a_role_cd,p_a_strt_dt,m_ch,c_flg,e_fd_r_id,p_a_role,p_a_shrt_nm,d_mail_p_id,d_mail_p_cd,e_b_day,s_sys_id,s_sys_inst_id
count,99,99,99,99,99,99,99,99.0,99,99,99,99.0,99.0,99.0,99.0,99,99,99
unique,99,1,1,99,1,1,3,1.0,99,1,1,1.0,1.0,1.0,1.0,3,3,1
top,773362397,ABC,DEF,480181025,ABC,SRC,OWN,,KQ9963601117174,N,1234567,,,,,10/10/2019,BBB,ABC
freq,1,99,99,1,99,99,91,99.0,1,99,99,99.0,99.0,99.0,99.0,47,52,99


## Pandas profiling 
- Below is the code to get the pandas profiling report. 
- This report provides a summary at a table level and distinct, count, null at a column level. This summary also includes count per distinct per column, allowing to know the number of rows per business_date and source
- To get this running we need to put an entire table or desired columns into a pandas dataframe.
- Running out of memory error could occur as pandas is not optimized to be used in a distributed environment like hive. It need to be tested



In [21]:
from pandas_profiling import ProfileReport
profile_report = ProfileReport(pd_testing_df, minimal=True)
profile_report.to_file(output_file='output.html')

variables: 100%|██████████| 18/18 [00:00<00:00, 82.95it/s]
table: 100%|██████████| 1/1 [00:00<00:00, 71.59it/s]
package: 100%|██████████| 1/1 [00:00<00:00, 111.12it/s]
build report structure: 100%|██████████| 1/1 [00:00<00:00, 32.18it/s]


# Pyspark method
-This method runs describe in a spark dataframe and complement with other methods in spark to fill gaps that spark describe doesn't provide like null values count
- The output is a pandas dataframe that can be moved into a hive dataframe to join with the profiling table. 

In [25]:
database_name = 'database_name1'
table_name = 'part_agreement_ss'
df_to_profile = testing_df
df_columns_to_profile = df_to_profile.columns

In [26]:
profiling_df = pd.DataFrame({'database_name':[database_name] * len(df_to_profile.columns),\
                                        'table_name':[table_name] * len(df_to_profile.columns),\
                                        'column_names':df_to_profile.columns,\
                                        'data_types': [x[1] for x in df_to_profile.dtypes]})
print('initial profiling dataframe created')

initial profiling dataframe created


In [27]:
#####row_counts
num_rows = df_to_profile.count()
profiling_df['num_rows'] = num_rows
print('num of rows included in the profiling')

num of rows included in the profiling


In [28]:
#####count nulls 
df_nulls_nan = df_to_profile.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_to_profile.columns if df_to_profile.select(c).dtypes[0][1] != 'timestamp']).toPandas().transpose()
df_nulls_nan = df_nulls_nan.reset_index()
df_nulls_nan.columns = ['column_names', 'num_null']
profiling_df = pd.merge(profiling_df, df_nulls_nan, on = ['column_names'], how = 'left')

In [30]:
#####describe_function
df_describe_function = df_to_profile.describe().toPandas().transpose()
df_describe_function.columns = ['count', 'mean', 'standar_dev', 'min', 'max']
df_describe_function = df_describe_function.iloc[1:,:]
df_describe_function = df_describe_function.reset_index()
df_describe_function.columns.values[0] = 'column_names'
df_describe_function = df_describe_function[['column_names', 'count', 'mean', 'standar_dev', 'min', 'max']]
profiling_df = pd.merge(profiling_df, df_describe_function, on =['column_names'], how = 'left')  

In [34]:
def color_negative_red(val):
    if type(val) == int:
        if val <= 0:
            color = 'red'
        elif val > 0:
            color = 'green'
        else:
            color = 'black'
    else:
        color = 'black'
    #color = 'red' if val < 0 else 'black'
    return 'color: %s' % color

In [35]:
profiling_df.style.applymap(color_negative_red)

Unnamed: 0,database_name,table_name,column_names,data_types,num_rows,num_null,count,mean,standar_dev,min,max
0,database_name1,part_agreement_ss,p_id,string,99,0,99,582730122.7676767,243733572.4680058,123603164,986873542
1,database_name1,part_agreement_ss,p_inst_id,string,99,0,99,,,ABC,ABC
2,database_name1,part_agreement_ss,p_src_id,string,99,0,99,,,DEF,DEF
3,database_name1,part_agreement_ss,a_id,string,99,0,99,538765920.4040405,258629377.9107589,112815503,998871315
4,database_name1,part_agreement_ss,a_inst_id,string,99,0,99,,,ABC,ABC
5,database_name1,part_agreement_ss,a_src_id,string,99,0,99,,,SRC,SRC
6,database_name1,part_agreement_ss,p_a_role_cd,string,99,0,99,,,OOO,OWN
7,database_name1,part_agreement_ss,p_a_strt_dt,string,99,0,99,,,,
8,database_name1,part_agreement_ss,m_ch,string,99,0,99,,,AC8720717025944,ZY1868772203810
9,database_name1,part_agreement_ss,c_flg,string,99,0,99,,,N,N
