In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Windowing Functions'). \
    master('yarn'). \
    getOrCreate()



In [2]:
# print("""
#     Spark sql consists 
#     - Data Files (Stores data in disks - Datalakes)
#     - Meta Data  (Stores data in Metastore - Databases by default Derby)
    
#     Spark Table (which get generated by Spark Sql) : is a persistant store works like data stores in Databases. 
#     - Spark table is accessible by another sessions and could be used by another platform like Power Bi or Tableau.
    
# """)

In [3]:
file_path = '/public/trendytech/retail_db/orders/part-00000'

In [4]:
schema = "id Integer,order_date date, customer_id Integer, status String "

df = spark.read.format('csv').schema(schema).option('path',file_path).load()

df.show(3)

+---+----------+-----------+---------------+
| id|order_date|customer_id|         status|
+---+----------+-----------+---------------+
|  1|2013-07-25|      11599|         CLOSED|
|  2|2013-07-25|        256|PENDING_PAYMENT|
|  3|2013-07-25|      12111|       COMPLETE|
+---+----------+-----------+---------------+
only showing top 3 rows



In [42]:
dest_path = 'orders'

df.write.mode('overwrite')\
    .format('csv')\
    .option('path',dest_path)\
    .save()

In [5]:
# create a  temp table

# df.createTempView   (if view is already exists then it will give error)
# df.createOrReplaceTempView (if view already exists it will replace old view and will not give any error. )
# df.createOrReplaceGlobalTempView (This view will be visible by other applications also)

df.createOrReplaceTempView('my_orders')

new_df = spark.sql("select * from my_orders limit 3 ")

new_df.show(3)


+---+----------+-----------+---------------+
| id|order_date|customer_id|         status|
+---+----------+-----------+---------------+
|  1|2013-07-25|      11599|         CLOSED|
|  2|2013-07-25|        256|PENDING_PAYMENT|
|  3|2013-07-25|      12111|       COMPLETE|
+---+----------+-----------+---------------+



In [6]:
# We can use DataFrame and Sparktable interchangegably

spark_table_df = spark.read.table('my_orders')

spark_table_df.show(6)


+---+----------+-----------+---------------+
| id|order_date|customer_id|         status|
+---+----------+-----------+---------------+
|  1|2013-07-25|      11599|         CLOSED|
|  2|2013-07-25|        256|PENDING_PAYMENT|
|  3|2013-07-25|      12111|       COMPLETE|
|  4|2013-07-25|       8827|         CLOSED|
|  5|2013-07-25|      11318|       COMPLETE|
|  6|2013-07-25|       7130|       COMPLETE|
+---+----------+-----------+---------------+
only showing top 6 rows



In [7]:
print(spark.sql("show databases"))

+-------------------+
|          namespace|
+-------------------+
|  0001_av_ivy_tesco|
|       003402_hive1|
|   005198_ivy_tesco|
|   005212_ivy_tesco|
|005222_ivy_practice|
|005260_ivy_database|
|        00ivy_tesco|
|         00ivy_test|
|      07172021_nyse|
|    07172021_retail|
|       07172021_sms|
|        1230_trendy|
|    1230_trendytech|
|      1540retail_db|
|        1993_ankita|
|               1src|
|              26may|
|               2stg|
|               3etl|
|           44_tesco|
+-------------------+
only showing top 20 rows



In [8]:
# when we have not specified database then it will show default database only.
spark.sql("show tables").show()

+--------+-----------------+-----------+
|database|        tableName|isTemporary|
+--------+-----------------+-----------+
| default|            1htab|      false|
| default|   41group_movies|      false|
| default|    4group_movies|      false|
| default|             4tab|      false|
| default|    6_flags_simon|      false|
| default|               aa|      false|
| default|              abc|      false|
| default|             acid|      false|
| default|            acid1|      false|
| default|     acid_example|      false|
| default|    acid_example1|      false|
| default|    acid_example2|      false|
| default|           adata1|      false|
| default|        adata_ell|      false|
| default|         adata_vr|      false|
| default|    ad_earthquake|      false|
| default|ad_earthquake_par|      false|
| default|           adelta|      false|
| default|       adeltapart|      false|
| default|   adeltapartbuck|      false|
+--------+-----------------+-----------+
only showing top

In [9]:
# We can create new databases also.

spark.sql("CREATE DATABASE if not exists itv_006327_retail") 

In [10]:

from  pyspark.sql.functions import col
col("alphanumeric").rlike("^[0-9]*$")

database_df = spark.sql("show databases").filter(col("namespace").rlike("^itv_\d+27\w+l$"))

database_df.show()

+-----------------+
|        namespace|
+-----------------+
|itv_006327_retail|
+-----------------+



In [12]:
# To enter into a specific databases;

spark.sql('use itv_006327_retail')

In [13]:
spark.sql("""CREATE TABLE if not exists employee (id int, name string)""")

In [15]:
spark.sql("show tables")

database,tableName,isTemporary
itv_006327_retail,employee,False
itv_006327_retail,itv006327_externa...,False
,my_orders,True


In [16]:
spark.sql("""INSERT INTO itv_006327_retail.employee VALUES  
            (1,'Shashank'),
            (2,'Rj')"""
         )

In [17]:
spark.sql('select * FROM employee')

id,name
1,Shashank
1,Shashank
1,Shashank
2,Rj
2,Rj
2,Rj


In [18]:
#  support you want to load data from a temp table to permanent table
#  1 - First create a temp table view with create df.createOrReplaceTempView
#  2 - CREate a new table then insert data from temp view. 

#  Ex.   spark.sql("""INSERT INTO itv_006327_retail.orders SELECT * FROM orders """)

In [19]:
# To see details about a table
spark.sql("describe table itv_006327_retail.employee ")

col_name,data_type,comment
id,int,
name,string,


In [20]:
# To see more  details about a table
spark.sql("describe extended  itv_006327_retail.employee ").show(truncate = False)

+----------------------------+------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                           |comment|
+----------------------------+------------------------------------------------------------------------------------+-------+
|id                          |int                                                                                 |null   |
|name                        |string                                                                              |null   |
|                            |                                                                                    |       |
|# Detailed Table Information|                                                                                    |       |
|Database                    |itv_006327_retail                                                                   |       |
|Table  

In [43]:

"""
  Create a external table 
  
  - When you delete data in external table only meta data gets deleted
  - You just specify the location of data along with file formatsap  

"""

spark.sql(""" CREATE TABLE itv006327_ext_orders
                 (
                     order_id Integer,
                     order_date string,
                     customer_id Integer,
                     status String
                 )
              using csv location 'orders'
              """
         )

In [44]:
spark.sql('show tables').show(truncate=False)

+-----------------+-------------------------+-----------+
|database         |tableName                |isTemporary|
+-----------------+-------------------------+-----------+
|itv_006327_retail|employee                 |false      |
|itv_006327_retail|itv006327_external_orders|false      |
|itv_006327_retail|itv006327_ext_orders     |false      |
|                 |my_orders                |true       |
+-----------------+-------------------------+-----------+



In [45]:
spark.sql("select * FROM itv006327_ext_orders  Limit 3").show(3)

+--------+----------+-----------+---------------+
|order_id|order_date|customer_id|         status|
+--------+----------+-----------+---------------+
|       1|2013-07-25|      11599|         CLOSED|
|       2|2013-07-25|        256|PENDING_PAYMENT|
|       3|2013-07-25|      12111|       COMPLETE|
+--------+----------+-----------+---------------+



In [46]:
spark.sql ('describe itv006327_ext_orders')

col_name,data_type,comment
order_id,int,
order_date,string,
customer_id,int,
status,string,


In [47]:
spark.sql ('describe extended itv006327_ext_orders').show(truncate = False)

+----------------------------+---------------------------------------------------------+-------+
|col_name                    |data_type                                                |comment|
+----------------------------+---------------------------------------------------------+-------+
|order_id                    |int                                                      |null   |
|order_date                  |string                                                   |null   |
|customer_id                 |int                                                      |null   |
|status                      |string                                                   |null   |
|                            |                                                         |       |
|# Detailed Table Information|                                                         |       |
|Database                    |itv_006327_retail                                        |       |
|Table                       |

In [48]:
# you can't run truncate on external tables

spark.sql('truncate table itv006327_ext_orders')

AnalysisException: Operation not allowed: TRUNCATE TABLE on external tables: `itv_006327_retail`.`itv006327_ext_orders`;

In [50]:
"""
    You will notice that these new records has inserted in a new file. orginal file remain 
    
    In open source spark we can only do
    
    - SELECT
    - INSERT
    
    We can't do UPDATE and DELETE which we can do in Databricks Spark.
"""

spark.sql(""" INSERT INTO TABLE itv_006327_retail.itv006327_ext_orders VALUES (11111,'2023-01-01', 2222, 'CLOSED') """)

In [1]:
%pwd

'/home/itv006327/${system:java.io.tmpdir}'

In [2]:
%ls

DataFrame_more.ipynb  spark_sql.ipynb  [0m[01;34mUntitled Folder[0m/
[01;34mlearn[0m/                Untitled1.ipynb
