# Data Skipping Sample - Python

[Data skipping](#) can significantly boost the performance of SQL queries by skipping over irrelevant data objects or files based on a summary metadata associated with each object.

For every column in the object, the summary metadata might include minimum and maximum values, a list or bloom filter of the appearing values, or other metadata which succinctly represents the data in that column. This metadata is used during query evaluation to skip over objects which have no relevant data.

All Spark native data formats are supported, including Parquet, ORC, CSV, JSON and Avro. Data skipping is a performance optimization feature which means that using data skipping does not affect the content of the query results.

To use this feature, you need to create indexes on one or more columns of the data set. After this is done, Spark SQL queries can benefit from data skipping. In general, you should index the columns which are queried most often in the WHERE clause.

## Setup the environment

In [None]:
from metaindex import MetaIndexManager

**(optional)** set log level to DEBUG for the metaindex package - to view the skipped objects 

In [None]:
log4jLogger = spark.sparkContext._jvm.org.apache.log4j
log4jLogger.LogManager.getLogger('com.ibm.metaindex.search').setLevel(log4jLogger.Level.DEBUG)

### Configure Stocator
For more info on how to config credentials see [here](https://github.com/CODAIT/stocator)

See [here](https://cloud.ibm.com/docs/services/cloud-object-storage?topic=cloud-object-storage-endpoints) for the list of endpoints.
Make sure you choose the private endpoint of your bucket

In [None]:
hconf = spark.sparkContext._jsc.hadoopConfiguration()
hconf.set("fs.cos.service.endpoint" ,"https://s3.private.us-south.cloud-object-storage.appdomain.cloud")
hconf.set("fs.cos.service.access.key", "<accessKey>")
hconf.set("fs.cos.service.secret.key","<secretKey>")

## Setup the DataSkipping library
In this example, we will set the JVM wide parameter to a base path to store all of the indexes. 

Metadata can be stored on the same storage system as the data however, not under the same path. For more configuration options, see [Data skipping configuration options](#).

In [None]:
md_base_location = "cos://mybucket.service/location/to/my/base/metdata"
MetaIndexManager.setDefaultMetaDataStore(spark, 'com.ibm.metaindex.metadata.metadatastore.parquet.Parquet')
md_backend_config = dict([('spark.ibm.metaindex.parquet.mdlocation', md_base_location),
("spark.ibm.metaindex.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")])
MetaIndexManager.setConf(spark, md_backend_config)

## Indexing a dataset

Skip this step if the data set is already indexed.

Note that each of the index types has a corresponding method in the indexBuilder class of the form:

`add[IndexType]Index(<index_params>)`

For example:

`addMinMaxIndex(col: String)`

`addValueListIndex(col: String)`

`addBloomFilterIndex(col: String)`

In [None]:
dataset_location = "cos://mybucket.service/location/to/my/data"
md_backend = 'com.ibm.metaindex.metadata.metadatastore.parquet.ParquetMetadataBackend'
reader = spark.read.format("parquet")
im = MetaIndexManager(spark, dataset_location, md_backend)

# remove existing index first
if im.isIndexed():
	im.removeIndex()
    
# indexing
print("Building the index:")
im.indexBuilder()\
  .addMinMaxIndex("temp")\
  .addValueListIndex("city")\
  .addBloomFilterIndex("vid")\
  .build(reader)\
  .show(10, False)

**(optional)** to refresh an indexed dataset use

In [None]:
im.refreshIndex(reader).show(10, False)

view index status

In [None]:
im.indexStats().show(10, False)

## Using the data skipping indexes 

### Injecting the data skipping rule and enabling data skipping
the rule injection should be done only once per Spark session.

In [None]:
# inject the data skipping rule
MetaIndexManager.injectDataSkippingRule(spark)

# enable data skipping
MetaIndexManager.enableFiltering(spark)

# you can disable the data skipping any time by running: 
# MetaIndexManager.disableFiltering(spark)`

## Run a query

In [None]:
df = reader.load(dataset_location)
df.createOrReplaceTempView("metergen")
spark.sql("select count(*) from metergen where temp > 30").show()

## View the data skipping statistics

In [None]:
MetaIndexManager.getLatestQueryAggregatedStats(spark).show(10, False)

**(optional)** clear the stats for the next query (otherwise, stats will acummulate)


In [None]:
MetaIndexManager.clearStats(spark)