# Run Spark use cases for watsonx.data
## Introduction

This notebook demonstrates how IBM® watsonx.data integrates with IBM Analytics Engine and also helps to understand how to run Spark use cases for watsonx.data by using Python samples.

IBM Cloud Pak for Data provides sample Spark usecase notebook, which can be used to understand the following functionalities in watsonx.data:

* Accessing tables 

* Ingesting data

* Modifying schema 

* Performing table maintenance activities


This notebook uses data that is publicly available at : https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

## Table of Contents

* [Before you begin](#byb)
* [Configuring IBM Analytics Engine](#conf)
* [Listing watsonx.data database](#lb)
* [Creating watsonx.data database](#cdb)
* [Table operations](#to)
* [Summary](#summ)




<a id="byb"></a>
## Before you begin


* You can go through the notebook execution cell by cell, by selecting Shift-Enter or you can execute the entire notebook by selecting **Cell -> Run All** from the menu.
* Get the following information from watsonx.data:

    * <wxd_hms_endpoint> : Thrift endpoint. For example, thrift://81823aaf-8a88-4bee-a0a1-6e76a42dc833.cfjag3sf0s5o87astjo0.databases.appdomain.cloud:32683.
    * <wxd_hms_username> : Username for watsonx.data instance. For IBM Cloud, the username is by default `ibmlhapikey`. For IBM Cloud Pak for Data, username of the user with `Metastore admin` role. For more information, see [Managing access to the Hive Metastore](https://www.ibm.com/docs/en/watsonxdata/1.1.x?topic=users-managing-access-hive-metastore).
    * <wxd_hms_password> : Hive Metastore (HMS) password.
    * Source bucket details:
        - <source_bucket_endpoint> : Endpoint of the source bucket. For example, for a source bucket in Dallas region, the endpoint is `s3.direct.us-south.cloud-object-storage.appdomain.cloud`.
        - <source_bucket_access_key>: Access key of the source bucket.
        - <source_bucket_secret_key> : Secret key of the source bucket.
    * Catalog bucket details:
        - <wxd_bucket_endpoint> : Endpoint of the catalog bucket. You can get the endpoint details from the watsonx.data instance administrator.
        - <wxd_bucket_access_key> : The access key of the catalog bucket. You can get the endpoint details from the watsonx.data instance administrator.
        - <wxd_bucket_secret_key> : The secret key of the catalog bucket. You can get the endpoint details from the watsonx.data instance administrator.

<a id="conf"></a>
## Configuring IBM Analytics Engine


To connect to watsonx.data from IBM Analytics Engine, configure the following watsonx.data and Spark details in the Analytics Engine instance.

In [None]:
# Configure Spark for wxd
conf=spark.sparkContext.getConf()
spark.stop()

from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession

#conf=SparkConf()
conf.setAll([("spark.sql.catalogImplementation", "hive"), \
    ("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"), \
    ("spark.sql.iceberg.vectorization.enabled", "false"), \
    ("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog"), \
    ("spark.sql.catalog.lakehouse.type", "hive"), \
    ("spark.sql.catalog.lakehouse.uri", wxd_hms_endpoint), \
    ("spark.hive.metastore.client.auth.mode", "PLAIN"), \
    ("spark.hive.metastore.client.plain.username", wxd_hms_username), \
    ("spark.hive.metastore.client.plain.password", wxd_hms_password), \
    ("spark.hive.metastore.use.SSL", "true"), \
    ("spark.hive.metastore.truststore.type", "JKS"), \
    ("spark.hive.metastore.truststore.path", "file:///opt/ibm/jdk/lib/security/cacerts"), \
    ("spark.hive.metastore.truststore.password", "changeit"), \
    ("spark.hadoop.fs.s3a.bucket.lakehouse-iae.endpoint", source_bucket_endpoint), \
    ("spark.hadoop.fs.s3a.bucket.lakehouse-iae.access.key", source_bucket_access_key), \
    ("spark.hadoop.fs.s3a.bucket.lakehouse-iae.secret.key", source_bucket_secret_key), \
    ("spark.hadoop.fs.s3a.bucket.serverless-demo-bucket.endpoint", wxd_bucket_endpoint), \
    ("spark.hadoop.fs.s3a.bucket.serverless-demo-bucket.access.key", wxd_bucket_access_key), \
    ("spark.hadoop.fs.s3a.bucket.serverless-demo-bucket.secret.key", wxd_bucket_secret_key) \
])

spark=SparkSession.builder.config(conf=conf).getOrCreate()

<a id="lb"></a>
## Listing watsonx.data database


Use the `list_databases` function to list the existing databases in watsonx.data.

In [None]:
def list_databases(spark):
    # list the database under lakehouse catalog
    spark.sql("show databases from lakehouse").show()

In [None]:
list_databases(spark)

<a id="cdb"></a>
## Creating watsonx.data database


Use the `create_database` function to create a new database named, `demodb` inside the watsonx.data catalog, `lakehouse`.

In [None]:
def create_database(spark):
    # Create a database in the lakehouse catalog
    spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-iae/'")

create_database(spark)

In [None]:
list_databases(spark)

<a id="to"></a>
## Table operations

watsonx.data supports creating tables by using the `basic_iceberg_table_operations` function. You can also create tables by importing data files that are in parquet and .csv formats by using `create_table_from_parquet_data` and `ingest_from_csv_temp_table`.

### Performing basic table operations in Iceberg

The database, `demodb` is configured to store all the data and metadata under the Cloud Object Storage (COS) bucket. It also creates an Iceberg table named, `testTable` and accesses the table to insert data and run a basic query.

In [None]:
def basic_iceberg_table_operations(spark):
    # demonstration: Create a basic Iceberg table, insert some data and then query table
    spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
    spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
    spark.sql("select * from lakehouse.demodb.testTable").show()

basic_iceberg_table_operations(spark)

### Ingesting data in parquet format to watsonx.data

To run the usecase, download sample parquet data (for example, six months taxi data for the year 2022) from the following link.
* [Sample parquet file](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

The `create_table_from_parquet_data` function allows you to ingest data in parquet format from a source COS bucket into a watsonx.data table within the `demodb` database. The sample data, `yellow_tripdata_2022-01.parquet` is inserted from the source COS bucket into the watsonx.data table, `table yellow_taxi_2022`. 

In [None]:
def create_table_from_parquet_data(spark):
    # load parquet data into dataframce
    df = spark.read.option("header",True).parquet("s3a://serverless-demo-bucket/nyc-taxi/yellow_tripdata_2022-01.parquet")
    # write the dataframe into an Iceberg table
    df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
    # describe the table created
    spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
    # query the table
    spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()

In [None]:
create_table_from_parquet_data(spark)

Run the following code to display the data from the watsonx.data table, `yellow_taxi_2022`. 

In [None]:
spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').show()

### Ingesting data in CSV format to watsonx.data

To run the usecase, download sample csv file (for example, zipcodes.csv) from the following link.

* [Sample CSV file](https://raw.githubusercontent.com/spark-examples/spark-scala-examples/3ea16e4c6c1614609c2bd7ebdffcee01c0fe6017/src/main/resources/zipcodes.csv)

The `ingest_from_csv_temp_table` function allows you to ingest data in CSV format from a source COS bucket into a watsonx.data table within the `demodb` database. The sample data, `zipcodes.csv` is inserted from source COS bucket into the watsonx.data table, `zipcodes`. 

In [None]:
def ingest_from_csv_temp_table(spark):
    # load csv data into a dataframe
    csvDF = spark.read.option("header",True).csv("s3a://serverless-demo-bucket/zipcodes.csv")
    csvDF.createOrReplaceTempView("tempCSVTable")
    # load temporary table into an Iceberg table
    spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
    # describe the table created
    spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
    # query the table
    spark.sql('select * from lakehouse.demodb.zipcodes').show()

In [None]:
ingest_from_csv_temp_table(spark)

### Analyzing monthly data

Use the `ingest_monthly_data` function to stack monthly data (both CSV and parquet) into the watsonx.data table. You can update the watsonx.data table to include new data from the source data bucket on a monthly basis. Here, parquet sample is used as an example.

In [None]:
def ingest_monthly_data(spark):
    df_feb = spark.read.option("header",True).parquet("s3a://serverless-demo-bucket//nyc-taxi/yellow_tripdata_2022-02.parquet")
    df_march = spark.read.option("header",True).parquet("s3a://serverless-demo-bucket//nyc-taxi/yellow_tripdata_2022-03.parquet")
    df_april = spark.read.option("header",True).parquet("s3a://serverless-demo-bucket//nyc-taxi/yellow_tripdata_2022-04.parquet")
    df_may = spark.read.option("header",True).parquet("s3a://serverless-demo-bucket//nyc-taxi/yellow_tripdata_2022-05.parquet")
    df_june = spark.read.option("header",True).parquet("s3a://serverless-demo-bucket//nyc-taxi/yellow_tripdata_2022-06.parquet")

    df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
    df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")

In [None]:
ingest_monthly_data(spark)

Run the following code to check for the total number of records in the watsonx.data table, `table yellow_taxi_2022`.

In [None]:
spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()

### Table maintenence   

Table maintenance helps in maintaining the performance of tables in watsonx.data. Iceberg provides table maintenance procedures out of the box that allows performing powerful table optimizations in a declarative way. The following sample demonstrates how to do some table maintenance operations by using Spark. For more information about the Iceberg Spark table maintenance operations, see [Table Operations](https://iceberg.apache.org/docs/latest/spark-procedures/).

The following are some of the table maintenance operations:

* Querying the table and listing the underlying data files.
* Adjusting the file size to make data compact
* Removing unused data files


In [None]:
def perform_table_maintenance_operations(spark):
    # Query the metadata files table to list underlying data files
    spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

    # There are many smaller files compact them into files of 200MB each using the
    # `rewrite_data_files` Iceberg Spark procedure
    spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()

    # Again, query the metadata files table to list underlying data files; 6 files are compacted
    # to 3 files
    spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

    # List all the snapshots
    # Expire earlier snapshots. Only latest one with comacted data is required
    # Again, List all the snapshots to see only 1 left
    spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
    #retain only the latest one
    latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
    print (latest_snapshot_committed_at)
    spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
    spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()

    # Removing Orphan data files
    spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)

    # Rewriting Manifest Files
    spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()

In [None]:
# Query the metadata files table to list underlying data files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()


In [None]:
# There are many smaller files compact them into files of 200MB each using the
# `rewrite_data_files` Iceberg Spark procedure
spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()


In [None]:
perform_table_maintenance_operations(spark)

### Modifying schema in watsonx.data

Use the `evolve_schema` function to modify the data present in the watsonx.data table, `table yellow_taxi_2022`.

In [None]:
def evolve_schema(spark):
    # demonstration: Schema evolution
    # Add column fare_per_mile to the table
    spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
    # describe the table
    spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)

In [None]:
evolve_schema(spark)

<a id="cln"></a>
### Cleaning database


After you finish the tutorial, you no longer require the tables that are present in the `demodb` database. You can use the `clean_database` function to perform the clean up activity to remove the tables.

In [None]:
def clean_database(spark):
    # clean-up the demo database
    spark.sql('drop table if exists lakehouse.demodb.testTable purge')
    spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
    spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
    spark.sql('drop database if exists lakehouse.demodb cascade')

In [None]:
clean_database(spark)

In [None]:
list_databases(spark)

<a id="summ"></a>
## Summary

This notebook shows you how watsonx.data integrates with IBM Analytics Engine to achieve the use-cases like ingestion, table maintenance and complex analytics operations. Also, allows you to quickly and easily get started with Spark usecases.

Copyright © 2024 IBM. This notebook and its source code are released under the terms of the Apache License.