In [0]:
import pandas as pd

In [0]:
dataset_url = 'https://gist.githubusercontent.com/GaneshSparkz/b5662effbdae8746f7f7d8ed70c42b2d/raw/faf8b1a0d58e251f48a647d3881e7a960c3f0925/50_Startups.csv'

In [0]:
pandas_df = pd.read_csv(dataset_url)
type(pandas_df)

pandas.core.frame.DataFrame

In [0]:
pandas_df.columns = [x.replace(' ','_').replace('&','n').lower() for x in pandas_df.columns]
pandas_df.columns

Index(['rnd_spend', 'administration', 'marketing_spend', 'state', 'profit'], dtype='object')

### Import spark dependencies

In [0]:
spark.version

'3.5.0'

In [0]:
# spark comes preloaded however these are some typical imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [0]:
spark = SparkSession.builder.appName("Spark101").getOrCreate()

### Pandas to spark and back

In [0]:
spark_df = spark.createDataFrame(pandas_df)
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [0]:
pandas_df_back = spark_df.toPandas()
type(pandas_df_back)

pandas.core.frame.DataFrame

### Create sql view from spark

In [0]:
spark_df.createOrReplaceTempView("my_temp_table")

In [0]:
%sql
SHOW VIEWS;

namespace,viewName,isTemporary,isMaterialized
,my_temp_table,True,False


In [0]:
# List all temporary views using PySpark
temp_views = spark.catalog.listTables()
temp_view_names = [view.name for view in temp_views if view.tableType == 'TEMPORARY']
print(temp_view_names)

['my_temp_table']


In [0]:
spark_qdf = spark.sql("SELECT * FROM my_temp_table")

### Delta writes/reads with spark

In [0]:
delta_path = "/dbfs/tmp/delta-table"

In [0]:
spark_df.write.format("delta").mode("overwrite").save(delta_path)

In [0]:
# # Write with overwrite mode
# spark_df.write.format("delta").mode("overwrite").save("/path/to/delta-table")
# # Write with append mode
# spark_df.write.format("parquet").mode("append").save("/path/to/parquet-file")

# # Write and partition by col1
# spark_df.write.format("delta").partitionBy("col1").save("/path/to/delta-table")
# # Write and partition by col1
# spark_df.write.format("parquet").partitionBy("col1").save("/path/to/parquet-file")

# Overwrite Specific Partitions: Delta Lake allows you to overwrite specific partitions without affecting the rest of the data.
# spark_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/path/to/delta-table")

In [0]:
delta_df = spark.read.format("delta").load(delta_path)

In [0]:
# # Read Parquet format
# parquet_df = spark.read.format("parquet").load("/path/to/parquet-file")

### Register tables to hive (1)

In [0]:
%sql
-- Create a database if not exists
CREATE DATABASE IF NOT EXISTS my_database;

In [0]:
%sql
DROP TABLE IF EXISTS my_database.my_table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS my_database.my_table
USING DELTA
LOCATION '/dbfs/tmp/delta-table';

In [0]:
%sql
SELECT * FROM my_database.my_table LIMIT 5;

rnd_spend,administration,marketing_spend,state,profit
165349.2,136897.8,471784.1,New York,192261.83
162597.7,151377.59,443898.53,California,191792.06
153441.51,101145.55,407934.54,Florida,191050.39
144372.41,118671.85,383199.62,New York,182901.99
142107.34,91391.77,366168.42,Florida,166187.94


### Register tables to hive (2)

In [0]:
delta_df.select("rnd_spend","profit").write.mode("overwrite").saveAsTable("my_database.my_table_2cols")

In [0]:
%sql
SHOW TABLES IN my_database;

database,tableName,isTemporary
my_database,iris,False
my_database,my_feats,False
my_database,my_table,False
my_database,my_table_2cols,False
,my_temp_table,True


In [0]:
%sql
SELECT * FROM my_database.my_table_2cols LIMIT 5;

rnd_spend,profit
165349.2,192261.83
162597.7,191792.06
153441.51,191050.39
144372.41,182901.99
142107.34,166187.94


### List tables and temp views

In [0]:
%sql
SHOW TABLES;

database,tableName,isTemporary
,my_temp_table,True


In [0]:
%sql
SHOW VIEWS;

namespace,viewName,isTemporary,isMaterialized
,my_temp_table,True,False


In [0]:
table_list = spark.catalog.listTables('my_database')
table_list

[Table(name='my_table', catalog='spark_catalog', namespace=['my_database'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='my_temp_table', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [0]:
table_list = [(x.name,x.database,x.isTemporary) for x in table_list]
print(table_list)

[('my_table', 'my_database', False), ('my_temp_table', None, True)]


### Read tables with spark sql

In [0]:
# read registered table
display(spark.sql("SELECT * FROM my_database.my_table"))

rnd_spend,administration,marketing_spend,state,profit
165349.2,136897.8,471784.1,New York,192261.83
162597.7,151377.59,443898.53,California,191792.06
153441.51,101145.55,407934.54,Florida,191050.39
144372.41,118671.85,383199.62,New York,182901.99
142107.34,91391.77,366168.42,Florida,166187.94
131876.9,99814.71,362861.36,New York,156991.12
134615.46,147198.87,127716.82,California,156122.51
130298.13,145530.06,323876.68,Florida,155752.6
120542.52,148718.95,311613.29,New York,152211.77
123334.88,108679.17,304981.62,California,149759.96


In [0]:
# read temp view
display(spark.sql("SELECT * FROM my_temp_table"))

rnd_spend,administration,marketing_spend,state,profit
165349.2,136897.8,471784.1,New York,192261.83
162597.7,151377.59,443898.53,California,191792.06
153441.51,101145.55,407934.54,Florida,191050.39
144372.41,118671.85,383199.62,New York,182901.99
142107.34,91391.77,366168.42,Florida,166187.94
131876.9,99814.71,362861.36,New York,156991.12
134615.46,147198.87,127716.82,California,156122.51
130298.13,145530.06,323876.68,Florida,155752.6
120542.52,148718.95,311613.29,New York,152211.77
123334.88,108679.17,304981.62,California,149759.96


### Play with catalogue tables

In [0]:
%sql
UPDATE my_database.my_table
SET state = 'NewYork' WHERE state = 'New York';

num_affected_rows
0


In [0]:
delta_df_changed = spark.read.format("delta").load(delta_path)

In [0]:
%sql
DROP TABLE IF EXISTS my_database.my_feats;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS my_database.my_feats
USING DELTA
LOCATION '/dbfs/tmp/delta/feats'
(
  SELECT 
  state
  ,rnd_spend
  ,administration as adm_spend
  ,marketing_spend as mrk_spend
  ,ROUND(rnd_spend + administration + marketing_spend,2) as total_spend
  ,ROUND(rnd_spend/(rnd_spend + administration + marketing_spend),2) as rnd_pct
  ,ROUND(administration/(rnd_spend + administration + marketing_spend),2) as adm_pct
  ,ROUND(marketing_spend/(rnd_spend + administration + marketing_spend),2) as mrk_pct
  ,profit
  FROM my_database.my_table
);

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM my_database.my_feats LIMIT 5;

state,rnd_spend,adm_spend,mrk_spend,total_spend,rnd_pct,adm_pct,mrk_pct,profit
NewYork,165349.2,136897.8,471784.1,774031.1,0.21,0.18,0.61,192261.83
California,162597.7,151377.59,443898.53,757873.82,0.21,0.2,0.59,191792.06
Florida,153441.51,101145.55,407934.54,662521.6,0.23,0.15,0.62,191050.39
NewYork,144372.41,118671.85,383199.62,646243.88,0.22,0.18,0.59,182901.99
Florida,142107.34,91391.77,366168.42,599667.53,0.24,0.15,0.61,166187.94


In [0]:
display(spark.sql('SELECT * FROM my_database.my_feats LIMIT 5;'))

state,rnd_spend,adm_spend,mrk_spend,total_spend,rnd_pct,adm_pct,mrk_pct,profit
NewYork,165349.2,136897.8,471784.1,774031.1,0.21,0.18,0.61,192261.83
California,162597.7,151377.59,443898.53,757873.82,0.21,0.2,0.59,191792.06
Florida,153441.51,101145.55,407934.54,662521.6,0.23,0.15,0.62,191050.39
NewYork,144372.41,118671.85,383199.62,646243.88,0.22,0.18,0.59,182901.99
Florida,142107.34,91391.77,366168.42,599667.53,0.24,0.15,0.61,166187.94


In [0]:
spark_feats = spark.read.format("delta").load('/dbfs/tmp/delta/feats')
display(spark_feats)

state,rnd_spend,adm_spend,mrk_spend,total_spend,rnd_pct,adm_pct,mrk_pct,profit
NewYork,165349.2,136897.8,471784.1,774031.1,0.21,0.18,0.61,192261.83
California,162597.7,151377.59,443898.53,757873.82,0.21,0.2,0.59,191792.06
Florida,153441.51,101145.55,407934.54,662521.6,0.23,0.15,0.62,191050.39
NewYork,144372.41,118671.85,383199.62,646243.88,0.22,0.18,0.59,182901.99
Florida,142107.34,91391.77,366168.42,599667.53,0.24,0.15,0.61,166187.94
NewYork,131876.9,99814.71,362861.36,594552.97,0.22,0.17,0.61,156991.12
California,134615.46,147198.87,127716.82,409531.15,0.33,0.36,0.31,156122.51
Florida,130298.13,145530.06,323876.68,599704.87,0.22,0.24,0.54,155752.6
NewYork,120542.52,148718.95,311613.29,580874.76,0.21,0.26,0.54,152211.77
California,123334.88,108679.17,304981.62,536995.67,0.23,0.2,0.57,149759.96


### List catalogue table paths

In [0]:
%sql
-- SHOW TABLES;
SHOW TABLES IN my_database;

database,tableName,isTemporary
my_database,my_feats,False
my_database,my_table,False
,my_temp_table,True


In [0]:
# List all tables in a specific database
database_name = "my_database"
tables = spark.catalog.listTables(database_name)
for table in tables:
    print(f"{table.database}.{table.name}")

# # List all tables in the current database
# tables = spark.catalog.listTables()
# for table in tables:
#     print(table.name)

my_database.my_feats
my_database.my_table
None.my_temp_table


In [0]:
spark.sql("DESCRIBE DETAIL my_database.my_feats").collect()[0]['location']

'dbfs:/dbfs/tmp/delta/feats'

In [0]:
def get_table_details(database:str)->None:
    table_list = []
    for t in spark.catalog.listTables(database):
        if not t.isTemporary:
            query = f"DESCRIBE DETAIL {t.database}.{t.name}"
            details = spark.sql(query).collect()[0]
            table_list.append({
                'database' : t.database,
                'name' : t.name,
                'isTemporary' : t.isTemporary,
                'location' : details["location"]
            })
    display(pd.DataFrame(table_list))

In [0]:
get_table_details(database_name)

database,name,isTemporary,location
my_database,my_feats,False,dbfs:/dbfs/tmp/delta/feats
my_database,my_table,False,dbfs:/dbfs/tmp/delta-table
