# Spark Catalog

Let us say **spark** is an object of type **SparkSession**. there is an attribute of **spark** called **catalog**.

We can access *catalog* using **spark.catalog** and permanently or temporarily create tables or views on top of dat in a DataFrame.

Metadata such as table names, column names, data types, etc. For the permanent tables or views will be stored in Metastore. We can acces that metadata using **spark.catalog** whichs is exposed as part of the SparkSession object. **spark.catalog** also provides us the details related to temporary views athat are being created. The metadata of these temporary views will not be stores in Spark Metastore.

Permanent tables are created using databases in saprk metastore. If not specified, the tables will be created in **default** database.

Here are some of the tasks that can be performed using spark.catalog object: 
1. Check current database and seitch to different databases.
2. Create permanent table in metastore.
3. Create or drop temporary views.
4. Regsiter functions.

All of the above commands can be passed using SQL style commands.

In [2]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

In [2]:
spark.catalog

<pyspark.sql.catalog.Catalog at 0x7ff9d90a9b70>

In [None]:
#RUN CELL IF YOU WANT TO SEE ALL OF AVAILABLE COMMANDS

help(spark.catalog)

# Creating Metastore Tables using **catalog**

DataFrames can be written into Metastore Tables using API's such as **saveAsTable** and **insertInto** available as part of **write** on top of dataframe type objects.

Databases can be created using **spark.sql ("CREATE DATABASE *database_name*")**. We can list databases using **spark.sql** or **spark.catalog.listDatabases()**

A new table can be created from a DataFrame using **saveAsTable**. An empty table can be created by using **spark.catalog.createTable** or **spark.catalog.createExternalTable**.

We can prefix the database name to write data into tables belonging to a particular database. if the database is not specified then the session will be default. Also the current session can be attached or connected to a specific database using **spark.catalog.setCurrentDatabase**.

The **saveAsTable** method allows for modes: append, overwrite and error. The default mode is error. The **insertInto** method allows for modes: append and overwrite, default mode is append.

In [5]:
spark.conf.set?

[0;31mSignature:[0m [0mspark[0m[0;34m.[0m[0mconf[0m[0;34m.[0m[0mset[0m[0;34m([0m[0mkey[0m[0;34m,[0m [0mvalue[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Sets the given Spark runtime configuration property.

.. versionadded:: 2.0
[0;31mFile:[0m      /opt/spark3/python/pyspark/sql/conf.py
[0;31mType:[0m      method


In [6]:
spark.sql("""CREATE DATABASE demo_db""")

In [4]:
spark.catalog.setCurrentDatabase("demo_db")

In [5]:
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='hdfs://localhost:9000/user/hive/warehouse'),
 Database(name='demo_db', description='', locationUri='file:/home/evivancovid/pyspark-exercises/spark-warehouse/demo_db.db'),
 Database(name='nyse_db', description='', locationUri='hdfs://localhost:9000/user/hive/warehouse/nyse_db.db')]

In [2]:
spark.sql("""SHOW DATABASES""")

namespace
default
demo_db
nyse_db


In [6]:
# Create a Data Frame which contain one column by name dummy and one row with value X.

l = [("X", )]
df = spark.createDataFrame(l, schema = "dummy STRING")

In [7]:
spark.catalog.listTables()

[Table(name='dual', database='demo_db', description=None, tableType='MANAGED', isTemporary=False)]

In [15]:
df.show()

+-----+
|dummy|
+-----+
|    X|
+-----+



In [16]:
# Create a table by name dual for the above Data Frame in the database created.

df.write.saveAsTable("dual", mode = "overwrite")

In [17]:
spark.catalog.listTables()

[Table(name='dual', database='demo_db', description=None, tableType='MANAGED', isTemporary=False)]

In [18]:
spark.read.table("dual").show()

+-----+
|dummy|
+-----+
|    X|
+-----+



In [19]:
spark.sql("DROP TABLE dual")

### Create empty table and insert data into it

In [7]:
df.show()

+-----+
|dummy|
+-----+
|    X|
+-----+



In [8]:
schema = df.schema

In [9]:
spark.catalog.createTable("dual", schema = schema)

dummy


In [10]:
df.write.insertInto("dual")

In [11]:
spark.read.table("dual").show()

+-----+
|dummy|
+-----+
|    X|
+-----+



# Inferring Schema for Tables

When we want to create a table using **spark.catalog.createTable** or using **spark.catalog.createExternalTable**, the schema need to be specified.
Spark can infer the schema from a DataFrame and can then be passed using a **StructType** object while creating the table. A StructType object takes a list of objects of type **StructField**. A **StructField** object is built using column name and data type.

# Define Schema for Tables using StructType

Spark can infer the schema from a DataFrame or can be passed using a **StructType** object while creating the table. A StructType object takes a list of objects of type **StructField**. A **StructField** object is built using column name and data type. All data types are available under **pyspark.sql.types**. We need to pass table name and schema when using **spark.catalog.createTable** and we have to pass path along with name and schema when using **spark.catalog.createExternalTable**



In [20]:
employees = [(1, "Scott", "Tiger", 1000.0,"united states"),
                     (2, "Henry", "Ford", 1250.0,"India"),
                     (3, "Nick", "Junior", 750.0,"united KINGDOM"),
                     (4, "Bill", "Gomes", 1500.0,"AUSTRALIA")
                ]

employeesDF = spark. \
    createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, nationality STRING""")

In [21]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType

#Build StructType object using StructField list.

employeesSchema = StructType([
    StructField("employee_id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("salary", FloatType()),
    StructField("nationality", StringType())
])

#Create table by passing StructType object as Schema

spark.catalog.createTable("employees", schema = employeesSchema)

AnalysisException: Table employees already exists.

# Inserting into Existing Tables

We can use **insertInto** with modes such as **append** and **overwrite** to insert data into an existing table, if we do not specify a mode, the default mode is **append**.
When we use **insertInto** the following happens:
1.	IF the table doesn’t exist, **insertInto** will throw and exception
2.	If the table exists, by default the data will be appended
3.	We can alter the above behaviour by using argument **overwrite**, which by default is False, we can pass True to replace existing data.


In [25]:
#Create employees DataFrame and insert it into already existing employees table (created in above topic)

employeesDF.write.insertInto("employees", overwrite=True)

# Read and Process data from Metastore Tables

Using DataFrame API’s we can read tables as follows: 
We can prefix the database name to read tables belonging to a particular database. 
When we **read** a table it results in a **DataFrame**. One we have a DataFrame object, we can use functions such as **filter**, **where**, **groupBy**, etc. 


In [8]:
spark.sql("""CREATE DATABASE IF NOT EXISTS evivancovid_nyvenv""")

In [9]:
spark.sql("""SHOW DATABASES""")

namespace
default
demo_db
evivancovid_nyvenv
nyse_db


In [2]:
spark.catalog.setCurrentDatabase("""evivancovid_nyvenv""")

In [3]:
spark.catalog.currentDatabase()

'evivancovid_nyvenv'

In [4]:
nyse_2000_path = "/public/data/nyse_years/NYSE_2000.txt"

In [5]:
nyse_2000_df = spark.read.csv(nyse_2000_path, sep = ",", header = False, schema = "ticker STRING, trade_date STRING, open_price FLOAT, close_price FLOAT, high FLOAT, low FLOAT, volume INT")

In [6]:
nyse_2000_df.show()

+------+----------+----------+-----------+------+------+-------+
|ticker|trade_date|open_price|close_price|  high|   low| volume|
+------+----------+----------+-----------+------+------+-------+
|     A|  20000103|     78.75|      78.94| 67.38|  72.0|3343700|
|    AA|  20000103|     123.0|     125.34|120.57|121.41|1551600|
|   ABB|  20000103|     24.66|      24.66| 24.66| 24.66|      0|
|   ABC|  20000103|     3.875|     3.9375|3.8125|  3.89| 696200|
|   ABM|  20000103|     10.25|      10.31|  10.0| 10.16| 241600|
|   ABT|  20000103|     35.25|       36.0| 34.75|  35.0|4774100|
|   ABX|  20000103|     17.56|      18.19| 17.44| 17.69|1509900|
|   ACP|  20000103|      7.56|        8.0|  7.56|  7.94|  12600|
|   ACV|  20000103|     17.04|      17.13|  16.5| 16.75|  70950|
|   ADC|  20000103|     14.37|      14.37| 13.62| 13.94|  24500|
|   ADM|  20000103|     10.89|      10.94| 10.77| 10.89| 985413|
|   ADX|  20000103|     22.34|       22.5| 22.16| 22.28| 117450|
|   AED|  20000103|      

In [7]:
nyse_2000_df.write.saveAsTable("nyse2000")

In [10]:
spark.sql("SHOW Tables")

database,tableName,isTemporary
evivancovid_nyvenv,nyse2000,False


In [13]:
nyse_2000 = spark.read.table("nyse2000")

In [14]:
nyse_2000.groupBy("ticker"). \
    count(). \
    show()

+------+-----+
|ticker|count|
+------+-----+
|     K|  260|
|   LEN|  260|
|   MHF|  260|
|   PKE|  260|
|   TLI|  260|
|   CCK|  260|
|   CRS|  260|
|   GIS|  260|
|   HAE|  260|
|   RRD|  260|
|   TRK|  260|
|   AIV|  260|
|   AVX|  260|
|   AVY|  260|
|   FDC|  260|
|  BF.B|  260|
|   DTF|  260|
|   MGF|  260|
|   MMM|  260|
|   PKI|  260|
+------+-----+
only showing top 20 rows



# Creating Partitioned Tables

We can create partitioned tables as part of Spark Metastore Tables. There are some challenges when creating partitioned tables directly using **spark.catalog.createTable**, however, if the directories are similar to partitioned tables with data, we should be able to create partitiones tables with no issues.

In [20]:
from pyspark.sql.functions import date_format, col, substring

In [23]:
spark.read.csv(nyse_2000_path, 
               header = False, 
               schema = "ticker STRING, trade_date STRING, open_price FLOAT, close_price FLOAT, high FLOAT, low FLOAT, volume INT"). \
    withColumn("trade_month", substring(col("trade_date"),1,6)). \
    write. \
    partitionBy("trade_month"). \
    parquet("/public/data/nyse_parts")

In [24]:
%%sh 

hdfs dfs -ls -R /public/data/nyse_parts

-rw-r--r--   1 evivancovid supergroup          0 2022-05-19 18:29 /public/data/nyse_parts/_SUCCESS
drwxr-xr-x   - evivancovid supergroup          0 2022-05-19 18:29 /public/data/nyse_parts/trade_month=200001
-rw-r--r--   1 evivancovid supergroup     376816 2022-05-19 18:29 /public/data/nyse_parts/trade_month=200001/part-00000-5cc4c68c-7c7f-4bdb-ba0c-9de2955dab3e.c000.snappy.parquet
drwxr-xr-x   - evivancovid supergroup          0 2022-05-19 18:29 /public/data/nyse_parts/trade_month=200002
-rw-r--r--   1 evivancovid supergroup     358443 2022-05-19 18:29 /public/data/nyse_parts/trade_month=200002/part-00000-5cc4c68c-7c7f-4bdb-ba0c-9de2955dab3e.c000.snappy.parquet
drwxr-xr-x   - evivancovid supergroup          0 2022-05-19 18:29 /public/data/nyse_parts/trade_month=200003
-rw-r--r--   1 evivancovid supergroup     421018 2022-05-19 18:29 /public/data/nyse_parts/trade_month=200003/part-00000-5cc4c68c-7c7f-4bdb-ba0c-9de2955dab3e.c000.snappy.parquet
drwxr-xr-x   - evivancovid supergroup      

In [27]:
spark.read.parquet("/public/data/nyse_parts/trade_month=200010").show()

+------+----------+----------+-----------+------+-------+-------+
|ticker|trade_date|open_price|close_price|  high|    low| volume|
+------+----------+----------+-----------+------+-------+-------+
|     A|  20001002|     49.13|      51.38| 49.13|  50.56|2151800|
|    AA|  20001002|     75.57|      76.68| 72.75|   73.5|3354900|
|   ABB|  20001002|     19.45|      19.45| 19.45|  19.45|      0|
|   ABC|  20001002|    11.875|    12.2975| 11.75|12.2975|3649400|
|   ABM|  20001002|     13.47|      13.69| 13.41|  13.47|  30400|
|   ABT|  20001002|     47.94|       48.0| 47.31|  47.94|2545700|
|   ABX|  20001002|     15.12|      15.19| 14.87|  14.94| 766900|
|   ACP|  20001002|      9.06|       9.12|  9.06|   9.12|   2500|
|   ACV|  20001002|     19.29|      19.37| 18.96|  19.25|  75000|
|   ADC|  20001002|     14.87|       15.0| 14.69|   15.0|   5000|
|   ADM|  20001002|      8.39|       8.69|  8.21|   8.69|3271169|
|   ADX|  20001002|     25.66|      25.75|  25.5|  25.53|  12000|
|   AED|  

In [28]:
spark. \
    read. \
    parquet("/public/data/nyse_parts"). \
    show()

+------+----------+----------+-----------+------+-------+--------+-----------+
|ticker|trade_date|open_price|close_price|  high|    low|  volume|trade_month|
+------+----------+----------+-----------+------+-------+--------+-----------+
|     A|  20000301|     104.5|     111.75|104.25| 109.13|  742400|     200003|
|    AA|  20000301|    102.66|    106.215|99.195|104.625| 2390300|     200003|
|   ABB|  20000301|      22.0|       22.0|  22.0|   22.0|     800|     200003|
|   ABC|  20000301|     3.655|       3.78|  3.64| 3.7175|  342900|     200003|
|   ABM|  20000301|      12.6|      12.72| 12.47|   12.6|  241000|     200003|
|   ABT|  20000301|     33.13|       34.0| 32.75|  33.81| 3832400|     200003|
|   ABX|  20000301|     16.62|      16.62| 16.37|  16.62|  864600|     200003|
|   ACP|  20000301|      7.69|       7.69|  7.69|   7.69|    1700|     200003|
|   ACV|  20000301|     14.33|      14.41| 13.91|  14.29|  103200|     200003|
|   ADC|  20000301|     13.56|      13.87|  13.5|  1

In [29]:
spark. \
    catalog. \
    createTable('nyseParts',
                path="/public/data/nyse_parts",
                source='parquet'
               )

ticker,trade_date,open_price,close_price,high,low,volume,trade_month


In [30]:
spark.read.table("nyseParts").show()

+------+----------+----------+-----------+----+---+------+-----------+
|ticker|trade_date|open_price|close_price|high|low|volume|trade_month|
+------+----------+----------+-----------+----+---+------+-----------+
+------+----------+----------+-----------+----+---+------+-----------+



In [32]:
spark.sql("SHOW PARTITIONS nyseParts").show()

+---------+
|partition|
+---------+
+---------+



In [34]:
spark.catalog.recoverPartitions("nyseParts")

In [35]:
spark.sql("SHOW PARTITIONS nyseParts").show()

+------------------+
|         partition|
+------------------+
|trade_month=200001|
|trade_month=200002|
|trade_month=200003|
|trade_month=200004|
|trade_month=200005|
|trade_month=200006|
|trade_month=200007|
|trade_month=200008|
|trade_month=200009|
|trade_month=200010|
|trade_month=200011|
|trade_month=200012|
+------------------+



In [36]:
spark.read.table("nyseParts").show()

+------+----------+----------+-----------+------+-------+--------+-----------+
|ticker|trade_date|open_price|close_price|  high|    low|  volume|trade_month|
+------+----------+----------+-----------+------+-------+--------+-----------+
|     A|  20000301|     104.5|     111.75|104.25| 109.13|  742400|     200003|
|    AA|  20000301|    102.66|    106.215|99.195|104.625| 2390300|     200003|
|   ABB|  20000301|      22.0|       22.0|  22.0|   22.0|     800|     200003|
|   ABC|  20000301|     3.655|       3.78|  3.64| 3.7175|  342900|     200003|
|   ABM|  20000301|      12.6|      12.72| 12.47|   12.6|  241000|     200003|
|   ABT|  20000301|     33.13|       34.0| 32.75|  33.81| 3832400|     200003|
|   ABX|  20000301|     16.62|      16.62| 16.37|  16.62|  864600|     200003|
|   ACP|  20000301|      7.69|       7.69|  7.69|   7.69|    1700|     200003|
|   ACV|  20000301|     14.33|      14.41| 13.91|  14.29|  103200|     200003|
|   ADC|  20000301|     13.56|      13.87|  13.5|  1

In [37]:
spark.sql("""SELECT trade_month, count(1) FROM nyseParts GROUP BY trade_month""")

trade_month,count(1)
200001,25229
200004,24265
200009,25819
200003,27759
200010,27174
200008,28213
200006,26810
200012,26005
200007,25705
200011,27228


# Creating Temp Views

We can create a temporary view for a DataDrame using **createTempView** or **createOrReplaceTempView**.

**createOrReplaceTempView** will repalce the existing view, if it already exists.

While tables in Metastore are permanent, views are temporary, once the application exits, temporary views will be deleted or flushed out.

# Using Spark SQL

Once tables in Metastore or temp view are created, we can run queries against the tables or temporary views to perform all standard transformations.

