In [70]:
from profiling.context_manager.spark_context_manager import SparkContextManager
from profiling.importer.importer import Importer

In [71]:
spark_manager = SparkContextManager()
spark = spark_manager.get_spark_session()

In [72]:
importer = Importer(SparkSession=spark)
profiling_df = importer.import_csv_table(file_path='./data/data_paper_sample_10k.csv', header='True', inferschema='True')

In [73]:
from profiling.config import table_profiler_config

print(table_profiler_config)

<module 'profiling.config.table_profiler_config' from '/Users/erfankashani/code/src/github.com/erfan_github/data_profiler/profiling/config/table_profiler_config.py'>


In [49]:
estimated_df = profiling_df.sample(fraction = 0.5)
estimated_df.cache().foreach(lambda x: x)
catalyst_plan = estimated_df._jdf.queryExecution().logical()
test_kb = spark._jsparkSession.sessionState().executePlan(catalyst_plan).optimizedPlan().stats().sizeInBytes()
print(test_kb * 50 / (1000000))

10.01905


In [69]:
[col[0] for col in profiling_df.dtypes]

['_c0',
 'id',
 'authors',
 'title',
 'year',
 'n_citation',
 'page_start',
 'page_end',
 'doc_type',
 'publisher',
 'volume',
 'issue',
 'doi',
 'references',
 'fos',
 'venue',
 'indexed_abstract']

In [76]:
from profiling.profiler.table_profiler import TableProfiler
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DateType, LongType

spark = SparkSession.builder.appName('TestSparkSession').getOrCreate()
sc = spark.sparkContext

input_data = [(1,500,200.0, 'Tesla',True,datetime.strptime('2019-12-01','%Y-%m-%d')),
                (2,0,200.0, 'Microsoft',True,datetime.strptime('2019-12-02','%Y-%m-%d')),
                (3,0,0.0, 'Google',False,datetime.strptime('2019-12-03','%Y-%m-%d')),
                (4,1000,0.0, 'Apple',True,datetime.strptime('2019-12-04','%Y-%m-%d')),
                (5,None,34.0, 'Costco',True,datetime.strptime('2019-12-05','%Y-%m-%d')),
                (6,None,None, 'Walmart',False,datetime.strptime('2019-12-06','%Y-%m-%d')),
                (7,34,None, 'Target',True,datetime.strptime('2019-12-07','%Y-%m-%d')),
                (8,10,20.0, 'Home Depot',True,datetime.strptime('2019-12-08','%Y-%m-%d'))
                ]

expected_data = [(8, 6, 3, 2, 1)]

input_schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('revenue', LongType(), True),
    StructField('budget', FloatType(), True),
    StructField('company_name', StringType(), True),
    StructField('is_active', BooleanType(), True),
    StructField('report_date', DateType(), True),
])
expected_schema = StructType([
        StructField('record_count', IntegerType(), True),
        StructField('column_count', IntegerType(), True),
        StructField('numeric_column_count', IntegerType(), True),
        StructField('categorical_column_count', IntegerType(), True),
        StructField('date_column_count', IntegerType(), True),
    ])

input_df = spark.createDataFrame(data=spark.sparkContext.parallelize(input_data), schema=input_schema)
expected_df = spark.createDataFrame(data=spark.sparkContext.parallelize(expected_data), schema=expected_schema)
table_profiler = TableProfiler(spark, input_df)
output_df = table_profiler.process()

In [77]:
output_df.show()

+------------+------------+--------------------+------------------------+-----------------+
|record_count|column_count|numeric_column_count|categorical_column_count|date_column_count|
+------------+------------+--------------------+------------------------+-----------------+
|           8|           6|                   2|                       2|                1|
+------------+------------+--------------------+------------------------+-----------------+



In [80]:
assert output_df.schema == expected_df.schema

In [81]:
input_df.dtypes

[('id', 'int'),
 ('revenue', 'bigint'),
 ('budget', 'float'),
 ('company_name', 'string'),
 ('is_active', 'boolean'),
 ('report_date', 'date')]