### O.S. process

In [1]:
import warnings
warnings.filterwarnings('ignore')

from os import environ

!host=$(hostname) 
!ip=$(ifconfig | grep 'inet ' | grep -v '127.0.0.1' | cut -c 7-17)
!echo "hostname: $(hostname)"

# create path if not exists
#!mkdir -p ~/notebooks/data/

# download if not exists
#!wget -nc https://files.grouplens.org/datasets/movielens/ml-25m.zip -P ~/notebooks/data/

# unzip if not exists
#!unzip -n ~/notebooks/data/ml-25m.zip -d ~/notebooks/data/

#!ls -las /home/admin/notebooks/data/ml-25m

# check environment variables: JAVA_HOME
!export JAVA_HOME=/opt/jdk
environ["JAVA_HOME"] = "/opt/jdk"
!echo "- JAVA_HOME:$JAVA_HOME"

# check environment variables: PYSPARK_SUBMIT_ARGS
!export PYSPARK_SUBMIT_ARGS='--packages io.delta:delta-core_2.12:2.1.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'
environ["PYSPARK_SUBMIT_ARGS"]='--packages io.delta:delta-core_2.12:2.1.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'
!echo "- PYSPARK_SUBMIT_ARGS:$PYSPARK_SUBMIT_ARGS"

# check environment variables: PATH
!export PATH=$PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/jdk:/opt/jdk/bin
environ["PATH"] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/jdk:/opt/jdk/bin"
!echo "- PATH:$PATH"

!pip freeze

# check java version
!java -version

hostname: jupyter-hub
- JAVA_HOME:/opt/jdk
- PYSPARK_SUBMIT_ARGS:--packages io.delta:delta-core_2.12:2.1.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell
- PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/jdk:/opt/jdk/bin
dbus-python==1.2.18
delta-spark==2.1.0
gyp==0.1
importlib-metadata==5.0.0
numpy==1.23.4
py4j==0.10.9.5
pyarrow==9.0.0
PyGObject==3.42.1
pyspark==3.3.0
zipp==3.9.0
java version "1.8.0_341"
Java(TM) SE Runtime Environment (build 1.8.0_341-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.341-b10, mixed mode)


### Function to reduce memory usage in Pandas DataFrame

In [2]:
import numpy as np

def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%\n'.format(100 * (start_mem - end_mem) / start_mem))
    
    return df

### Connect from Apache Spark Cluster - without Delta Lake

In [None]:
#from os import environ
#environ["SPARK_HOME"] = '/opt/apache-spark'
#environ["PATH"] = '$PATH:/opt/jdk:/opt/jdk/bin:/opt/apache-spark:/opt/apache-spark/bin:/opt/apache-spark/sbin'

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrameReader

#If you need to stop SparkContext (sc) or SparkSession
if 's_session' in locals():
    s_session.stop()
if 's_context' in locals():
    s_context.stop()

conf = SparkConf()

conf.setAppName("app_data_lake") \
.setMaster("spark://spark-master:7077") \
.setSparkHome("/opt/apache-spark")

s_context = SparkContext(conf=conf).getOrCreate()
s_session = SparkSession(sparkContext=s_context)
#builder = s_session.builder

### Read .CSV from SFTP and load into a Pandas DataFrame

In [None]:
import pysftp
from pandas import read_csv as pandas_read_csv
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# SFTP config connection
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
environ["FTP_HOST"] = 'sftp-01' # sftp-01 = 172.19.0.15
environ["FTP_PORT"] = '2222'
environ["FTP_USER"] = 'admin'
environ["FTP_PASS"] = 'admin'

# CSV schema
schema_doc = {
                "tags.csv": StructType([StructField("userId", IntegerType(), True),
                                     StructField("movieId", IntegerType(), True),
                                     StructField("tag", StringType(), True),
                                     StructField("timestamp", IntegerType(), True)]),
                "ratings.csv": StructType([StructField("userId", IntegerType(), True),
                                     StructField("movieId", IntegerType(), True),
                                     StructField("rating", FloatType(), True),
                                     StructField("timestamp", IntegerType(), True)])
                }

chunksize=500000
sftp_file="tags.csv" # 1.093.000 lines
#sftp_file="ratings.csv" # 25.000.000 lines

# open SFTP connection
with pysftp.Connection(environ["FTP_HOST"], port = int(environ["FTP_PORT"]), username = environ["FTP_USER"], password = environ["FTP_PASS"], cnopts=cnopts) as connection:
    print("Connection succesfully established…\n")
    # open the file
    with connection.open(remote_file = f"/data/{sftp_file}", mode='r') as file:

        i = 1
        for reader in pandas_read_csv(file, sep=',', chunksize=chunksize):
            chnk = "0 until " + str(chunksize) if i==1 else str(((chunksize * i) - chunksize) + 1) + " until " + str(chunksize * i)
            print(f"Chunksize block = line {chnk}")
            reader=reduce_mem_usage(df=reader)
            if i == 1:
                data = s_session.createDataFrame(data=reader, schema=schema_doc.get(sftp_file, sftp_file.split('.')[0]))
            else:
                new_data = s_session.createDataFrame(data=reader, schema=schema_doc.get(sftp_file, sftp_file.split('.')[0]))
                data = data.union(new_data)
                del new_data
            i = i + 1

connection.close()

### Resume of data

In [None]:
print( "- sparkSession: ", data.sparkSession, '\n' )
print("- Object: ", type(data), "\n")
print( "- schema: ", data.schema, '\n' )
print( "- printSchema: ", data.printSchema(), '\n' )
print( "- isStreaming: ", data.isStreaming, '\n' )
print( "- columns: ", data.columns, '\n' )
print( "- dtypes: ", data.dtypes, '\n' )
print( "- head: ", data.head(10), '\n' )
print( "- show: ", data.show(10), '\n' )
print( "- isEmpty: ", data.isEmpty(), '\n' )
print("- cache", data.cache(), '\n' ) # Persists the DataFrame with the default storage level (MEMORY_AND_DISK)
print( "- persist: ", data.persist(), '\n' ) # Sets the storage level to persist the contents of the DataFrame across operations after the first time it is computed.
print( "- storageLevel: ", data.storageLevel, '\n' )
print( "- count: ", data.count(), '\n' )
if sftp_file=="ratings.csv":
    print( "- correlation between rating and timestamp: ", data.corr("rating", "timestamp"), '\n' )
    print( "- covariance between rating and timestamp: ", data.cov("rating", "timestamp"), '\n' ) # Calculate the sample covariance for the given columns, specified by their names, as a double value
    print( "- descriptive statistics: ", data.describe(["userId", "movieId", "rating", "timestamp"]).show(), '\n' )
#print( "- summary: ", data.summary(), '\n' ) # Computes specified statistics for numeric and string columns

### Write DataFrame in HDFS

In [None]:
data.write.csv("hdfs://hdpmaster:9000/users/hduser/teste1.csv", header=True, mode="ignore")
data.write.parquet("hdfs://hdpmaster:9000/users/hduser/teste1.parquet", mode="ignore")

### Read data from HDFS

In [None]:
df_load_csv = s_session.read.csv("hdfs://hdpmaster:9000/users/hduser/teste1.csv", header='true', inferSchema='true')
df_load_parquet = s_session.read.parquet("hdfs://hdpmaster:9000/users/hduser/teste1.parquet")

In [None]:
print("CSV FILE:", "\n")
print( "- sparkSession: ", df_load_csv.sparkSession, '\n' )
print("- Object: ", type(df_load_csv), "\n")
print( "- schema: ", df_load_csv.schema, '\n' )
print( "- printSchema: ", df_load_csv.printSchema(), '\n' )
print( "- isStreaming: ", df_load_csv.isStreaming, '\n' )
print( "- columns: ", df_load_csv.columns, '\n' )
print( "- dtypes: ", df_load_csv.dtypes, '\n' )
print( "- head: ", df_load_csv.head(10), '\n' )
print( "- show: ", df_load_csv.show(10), '\n' )
print("##########################################################")
print("PARQUET FILE:", "\n")
print( "- sparkSession: ", df_load_parquet.sparkSession, '\n' )
print("- Object: ", type(df_load_parquet), "\n")
print( "- schema: ", df_load_parquet.schema, '\n' )
print( "- printSchema: ", df_load_parquet.printSchema(), '\n' )
print( "- isStreaming: ", df_load_parquet.isStreaming, '\n' )
print( "- columns: ", df_load_parquet.columns, '\n' )
print( "- dtypes: ", df_load_parquet.dtypes, '\n' )
print( "- head: ", df_load_parquet.head(10), '\n' )
print( "- show: ", df_load_parquet.show(10), '\n' )

### Create a temporary table

In [None]:
s_session.sql("CREATE TEMPORARY VIEW teste USING parquet OPTIONS (path \"hdfs://hdpmaster:9000/users/hduser/teste1.parquet\")")

In [None]:
s_session.sql("select * from teste limit 10").show(truncate=False)

### Connect from Apache Spark Cluster - with Delta Lake Session

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

#If you need to stop SparkContext (sc) or SparkSession
if 's_session' in locals():
    s_session.stop()
if 's_context' in locals():
    s_context.stop()

#If you need to stop SparkContext (sc) or SparkSession
if 's_session_dl' in locals():
    s_session_dl.stop()
if 's_context_dl' in locals():
    s_context_dl.stop()

conf = SparkConf()

conf.setAppName("app_delta_lake") \
.setMaster("spark://spark-master:7077")
#.setSparkHome("/opt/apache-spark")
#.set("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
#.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
#.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

s_context_dl = SparkContext(conf=conf).getOrCreate()
s_session_dl = SparkSession(sparkContext=s_context_dl)
builder = s_session_dl.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")


from delta import configure_spark_with_delta_pip, DeltaTable

#my_packages = ["io.delta:delta-core_2.12:2.1.0"]
#s_session_delta = configure_spark_with_delta_pip(spark_session_builder=builder, extra_packages=my_packages).getOrCreate()

s_session_delta = configure_spark_with_delta_pip(spark_session_builder=builder).getOrCreate()

:: loading settings :: url = jar:file:/srv/jupyterhub/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/admin/.ivy2/cache
The jars for the packages stored in: /home/admin/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a5918e50-88cc-423e-a711-0cef267a5d22;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 164ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-core_2.12;2.1.0 from central in [default]
	io.delta#delta-storage;2.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evict

22/10/20 17:57:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/20 17:57:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
from sys import version as sys_version

print('jupyter-hub python version:', sys_version)
print('context pyspark version:', s_context_dl.version)
print('context java spark version:', s_context_dl._jsc.version())
print(f'context hadoop version = {s_context_dl._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}')

jupyter-hub python version: 3.10.6 (main, Aug 10 2022, 11:40:04) [GCC 11.3.0]
context pyspark version: 3.3.0
context java spark version: 3.3.0
context hadoop version = 3.3.2


### Read .CSV from SFTP and load into a Pandas DataFrame - with Delta Lake Session

In [5]:
import pysftp
from pandas import read_csv as pandas_read_csv
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import current_timestamp, date_format

# SFTP config connection
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
environ["FTP_HOST"] = 'sftp-01' # sftp-01 = 172.19.0.15
environ["FTP_PORT"] = '2222'
environ["FTP_USER"] = 'admin'
environ["FTP_PASS"] = 'admin'

# CSV schema
schema_doc = {
                "tags.csv": StructType([StructField("userId", IntegerType(), True),
                                     StructField("movieId", IntegerType(), True),
                                     StructField("tag", StringType(), True),
                                     StructField("timestamp", IntegerType(), True)]),
                "ratings.csv": StructType([StructField("userId", IntegerType(), True),
                                     StructField("movieId", IntegerType(), True),
                                     StructField("rating", FloatType(), True),
                                     StructField("timestamp", IntegerType(), True)])
                }

chunksize=500000
sftp_file="tags.csv" # 1.093.000 lines
#sftp_file="ratings.csv" # 25.000.000 lines

# open SFTP connection
with pysftp.Connection(environ["FTP_HOST"], port = int(environ["FTP_PORT"]), username = environ["FTP_USER"], password = environ["FTP_PASS"], cnopts=cnopts) as connection:
    print("Connection succesfully established…\n")
    # open the file
    with connection.open(remote_file = f"/data/{sftp_file}", mode='r') as file:

        i = 1
        for reader in pandas_read_csv(file, sep=',', chunksize=chunksize):
            chnk = "0 until " + str(chunksize) if i==1 else str(((chunksize * i) - chunksize) + 1) + " until " + str(chunksize * i)
            print(f"Chunksize block = line {chnk}")
            reader=reduce_mem_usage(df=reader)
            if i == 1:
                data_dl = s_session_delta.createDataFrame(data=reader, schema=schema_doc.get(sftp_file, sftp_file.split('.')[0]))
            else:
                new_data_dl = s_session_delta.createDataFrame(data=reader, schema=schema_doc.get(sftp_file, sftp_file.split('.')[0]))
                data_dl = data_dl.union(new_data_dl)
                del new_data_dl
            i = i + 1

connection.close()

Connection succesfully established…

Chunksize block = line 0 until 500000
Memory usage of dataframe is 15.26 MB
Memory usage after optimization is: 8.99 MB
Decreased by 41.1%

Chunksize block = line 500001 until 1000000
Memory usage of dataframe is 15.26 MB
Memory usage after optimization is: 8.97 MB
Decreased by 41.2%

Chunksize block = line 1000001 until 1500000
Memory usage of dataframe is 2.85 MB
Memory usage after optimization is: 1.88 MB
Decreased by 33.9%



In [6]:
data_dl = data_dl.withColumn("created_datetime", current_timestamp())

In [7]:
data_dl.show(10)

[Stage 0:>                                                          (0 + 1) / 1]

+------+-------+--------------------+----------+--------------------+
|userId|movieId|                 tag| timestamp|    created_datetime|
+------+-------+--------------------+----------+--------------------+
|     3|    260|             classic|1439472355|2022-10-20 17:40:...|
|     3|    260|              sci-fi|1439472256|2022-10-20 17:40:...|
|     4|   1732|         dark comedy|1573943598|2022-10-20 17:40:...|
|     4|   1732|      great dialogue|1573943604|2022-10-20 17:40:...|
|     4|   7569|    so bad it's good|1573943455|2022-10-20 17:40:...|
|     4|  44665|unreliable narrators|1573943619|2022-10-20 17:40:...|
|     4| 115569|               tense|1573943077|2022-10-20 17:40:...|
|     4| 115713|artificial intell...|1573942979|2022-10-20 17:40:...|
|     4| 115713|       philosophical|1573943033|2022-10-20 17:40:...|
|     4| 115713|               tense|1573943042|2022-10-20 17:40:...|
+------+-------+--------------------+----------+--------------------+
only showing top 10 

                                                                                

In [8]:
print( "- count: ", data_dl.count())

22/10/20 16:16:45 WARN TaskSetManager: Stage 1 contains a task of very large size (1024 KiB). The maximum recommended task size is 1000 KiB.




- count:  1093360


                                                                                

### Create tables WITHOUT the metastore - Delta Lake

In [43]:
s_session_delta.sql("CREATE SCHEMA IF NOT EXISTS bronze LOCATION 'hdfs://hdpmaster:9000/deltalake/bronze';")
s_session_delta.sql("CREATE SCHEMA IF NOT EXISTS silver LOCATION 'hdfs://hdpmaster:9000/deltalake/silver';")
s_session_delta.sql("CREATE SCHEMA IF NOT EXISTS gold LOCATION 'hdfs://hdpmaster:9000/deltalake/gold';")

DataFrame[]

In [None]:
s_session_delta.sql("""
CREATE TABLE IF NOT EXISTS bronze.tags (
      userId INT,
      movieId INT,
      tag STRING,
      timestamp INT
    ) USING DELTA
      LOCATION 'hdfs://hdpmaster:9000/deltalake/bronze/tags'
      COMMENT 'Table to store movie tags.';
    """)

#data_dl.createOrReplaceTempView("bronze.tags")

s_session_delta.sql("""
CREATE TABLE IF NOT EXISTS bronze.ratings (
      userId INT,
      movieId INT,
      rating FLOAT,
      timestamp INT
    ) USING DELTA
      LOCATION 'hdfs://hdpmaster:9000/deltalake/bronze/ratings'
      COMMENT 'Table to store movie ratings.';
    """)

#data_dl.createOrReplaceTempView("bronze.ratings")

In [44]:
s_session_delta.sql("SHOW DATABASES;").show()

+---------+
|namespace|
+---------+
|   bronze|
|  default|
|     gold|
|   silver|
+---------+



In [45]:
s_session_delta.sql("SHOW SCHEMAS;").show()

+---------+
|namespace|
+---------+
|   bronze|
|  default|
|     gold|
|   silver|
+---------+



In [46]:
s_session_delta.sql("""DESCRIBE SCHEMA EXTENDED default""").head(20)

[Row(info_name='Namespace Name', info_value='default'),
 Row(info_name='Comment', info_value='default database'),
 Row(info_name='Location', info_value='file:/home/admin/notebooks/spark-warehouse'),
 Row(info_name='Owner', info_value='admin'),
 Row(info_name='Properties', info_value='')]

In [47]:
s_session_delta.sql("""DESCRIBE SCHEMA EXTENDED bronze""").head(20)

[Row(info_name='Namespace Name', info_value='bronze'),
 Row(info_name='Comment', info_value=''),
 Row(info_name='Location', info_value='file:/home/admin/notebooks/spark-warehouse/bronze.db'),
 Row(info_name='Owner', info_value='admin'),
 Row(info_name='Properties', info_value='')]

In [25]:
s_session_delta.sql("SHOW TABLES IN default;").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|     tags|      false|
+---------+---------+-----------+



In [32]:
s_session_delta.sql("SHOW TABLES IN bronze;").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   bronze|     tags|      false|
+---------+---------+-----------+



In [41]:
s_session_delta.sql("SHOW TABLE EXTENDED IN bronze like 'tags'").head(20)

[Row(namespace='bronze', tableName='tags', isTemporary=False, information='Database: bronze\nTable: tags\nCreated Time: Thu Oct 20 17:59:11 GMT 2022\nLast Access: UNKNOWN\nCreated By: Spark 3.3.0\nType: EXTERNAL\nProvider: delta\nComment: Table to store the genre for each movie.\nLocation: hdfs://hdpmaster:9000/deltalake/bronze.tags\nPartition Provider: Catalog\n')]

### Create tables WITH the metastore - Delta Lake

In [61]:
s_session_delta.sql("CREATE SCHEMA IF NOT EXISTS bronze LOCATION 'hdfs://hdpmaster:9000/deltalake/bronze';")
s_session_delta.sql("CREATE SCHEMA IF NOT EXISTS silver LOCATION 'hdfs://hdpmaster:9000/deltalake/silver';")
s_session_delta.sql("CREATE SCHEMA IF NOT EXISTS gold LOCATION 'hdfs://hdpmaster:9000/deltalake/gold';")

DataFrame[]

In [62]:
s_session_delta.sql("""DESCRIBE SCHEMA EXTENDED bronze""").head(20)

[Row(info_name='Namespace Name', info_value='bronze'),
 Row(info_name='Comment', info_value=''),
 Row(info_name='Location', info_value='hdfs://hdpmaster:9000/deltalake/bronze'),
 Row(info_name='Owner', info_value='admin'),
 Row(info_name='Properties', info_value='')]

In [49]:
s_session_delta.sql("""DESCRIBE SCHEMA EXTENDED silver""").head(20)

[Row(info_name='Namespace Name', info_value='silver'),
 Row(info_name='Comment', info_value=''),
 Row(info_name='Location', info_value='file:/home/admin/notebooks/spark-warehouse/silver.db'),
 Row(info_name='Owner', info_value='admin'),
 Row(info_name='Properties', info_value='')]

In [63]:
s_session_delta.sql("""DESCRIBE SCHEMA EXTENDED gold""").head(20)

[Row(info_name='Namespace Name', info_value='gold'),
 Row(info_name='Comment', info_value=''),
 Row(info_name='Location', info_value='hdfs://hdpmaster:9000/deltalake/gold'),
 Row(info_name='Owner', info_value='admin'),
 Row(info_name='Properties', info_value='')]

In [64]:
# https://learn.microsoft.com/en-us/azure/databricks/delta/table-properties
s_session_delta.conf.set("delta.autoOptimize.autoCompact", "true")
s_session_delta.conf.set("delta.autoOptimize.optimizeWrite", "true")

In [65]:
# Create "bronze.tags" table in the metastore
DeltaTable.createIfNotExists(sparkSession=s_session_delta) \
  .tableName("bronze.tags") \
  .addColumn("userId", dataType=IntegerType(), nullable=True) \
  .addColumn("movieId", dataType=IntegerType(), nullable=True) \
  .addColumn("tag", dataType=StringType(), nullable=True, comment = "Movie genre.") \
  .addColumn("timestamp", dataType=IntegerType(), nullable=True) \
  .addColumn("created_datetime", dataType=TimestampType(), nullable=False) \
  .addColumn("created_date_year", dataType=IntegerType(), nullable=False, generatedAlwaysAs="YEAR(created_datetime)") \
  .addColumn("created_date_month", dataType=IntegerType(), nullable=False, generatedAlwaysAs="MONTH(created_datetime)") \
  .addColumn("created_date_day", dataType=IntegerType(), nullable=False, generatedAlwaysAs="DAY(created_datetime)") \
  .addColumn("modified_datetime", dataType=TimestampType(), nullable=True) \
  .comment("Table to store the genre for each movie.") \
  .property("description", "Table to store the genre for each movie.") \
  .location("hdfs://hdpmaster:9000/deltalake/bronze/tags") \
  .partitionedBy("created_date_year", "created_date_month") \
  .execute()

<delta.tables.DeltaTable at 0x7fbeec87c7f0>

In [66]:
# Create "bronze.ratings" table in the metastore
DeltaTable.createIfNotExists(sparkSession=s_session_delta) \
  .tableName("bronze.ratings") \
  .addColumn("userId", dataType=IntegerType(), nullable=True) \
  .addColumn("movieId", dataType=IntegerType(), nullable=True) \
  .addColumn("tag", dataType=StringType(), nullable=True, comment = "Movie genre.") \
  .addColumn("timestamp", dataType=IntegerType(), nullable=True) \
  .addColumn("created_datetime", dataType=TimestampType(), nullable=False) \
  .addColumn("created_date_year", dataType=IntegerType(), nullable=False, generatedAlwaysAs="YEAR(created_datetime)") \
  .addColumn("created_date_month", dataType=IntegerType(), nullable=False, generatedAlwaysAs="MONTH(created_datetime)") \
  .addColumn("created_date_day", dataType=IntegerType(), nullable=False, generatedAlwaysAs="DAY(created_datetime)") \
  .addColumn("modified_datetime", dataType=TimestampType(), nullable=True) \
  .comment("Movies classification ratings into a time series.") \
  .property("description", "Movies classification ratings into a time series.") \
  .location("hdfs://hdpmaster:9000/deltalake/bronze/ratings") \
  .partitionedBy("created_date_year", "created_date_month") \
  .execute()

<delta.tables.DeltaTable at 0x7fbeec87e200>

### Write data in table - Delta Lake

In [67]:
s_session_delta.sql("USE SCHEMA bronze;")
#ata_dl.write.format("delta").mode("append").save("hdfs://hdpmaster:9000/deltalake/bronze.tags")
data_dl.write.format("delta").mode("append").saveAsTable(name='bronze.tags')

# mode = append, overwrite, error, errorifexists, ignore

                                                                                

### Read data in table - Delta Lake

In [83]:
s_session_delta.sql("SELECT * FROM bronze.tags ORDER BY userId LIMIT 15;").show(15)

+------+-------+--------------------+----------+--------------------+-----------------+------------------+----------------+-----------------+
|userId|movieId|                 tag| timestamp|    created_datetime|created_date_year|created_date_month|created_date_day|modified_datetime|
+------+-------+--------------------+----------+--------------------+-----------------+------------------+----------------+-----------------+
|     3|    260|             classic|1439472355|2022-10-20 18:29:...|             2022|                10|              20|             null|
|     3|    260|              sci-fi|1439472256|2022-10-20 18:29:...|             2022|                10|              20|             null|
|     4| 168250|       unpredictable|1573945171|2022-10-20 18:29:...|             2022|                10|              20|             null|
|     4|   1732|         dark comedy|1573943598|2022-10-20 18:29:...|             2022|                10|              20|             null|
|     

In [69]:
s_session_delta.sql("SELECT COUNT(*) FROM bronze.tags;").show()

+--------+
|count(1)|
+--------+
| 1093360|
+--------+



### EXPLAIN statement is used to provide logical/physical plans

In [17]:
s_session_delta.sql("EXPLAIN EXTENDED SELECT * FROM bronze.tags ORDER BY userId LIMIT 150;").head(200)

[Row(plan="== Parsed Logical Plan ==\n'GlobalLimit 150\n+- 'LocalLimit 150\n   +- 'Sort ['userId ASC NULLS FIRST], true\n      +- 'Project [*]\n         +- 'UnresolvedRelation [bronze, tags], [], false\n\n== Analyzed Logical Plan ==\nuserId: int, movieId: int, tag: string, timestamp: int, created_datetime: timestamp, created_date_year: int, created_date_month: int, created_date_day: int, modified_datetime: timestamp\nGlobalLimit 150\n+- LocalLimit 150\n   +- Sort [userId#1533 ASC NULLS FIRST], true\n      +- Project [userId#1533, movieId#1534, tag#1535, timestamp#1536, created_datetime#1537, created_date_year#1538, created_date_month#1539, created_date_day#1540, modified_datetime#1541]\n         +- SubqueryAlias spark_catalog.bronze.tags\n            +- Relation bronze.tags[userId#1533,movieId#1534,tag#1535,timestamp#1536,created_datetime#1537,created_date_year#1538,created_date_month#1539,created_date_day#1540,modified_datetime#1541] parquet\n\n== Optimized Logical Plan ==\nGlobalLimi

In [18]:
s_session_delta.sql("EXPLAIN CODEGEN SELECT * FROM bronze.tags ORDER BY userId LIMIT 150;").head(200)

[Row(plan='Found 1 WholeStageCodegen subtrees.\n== Subtree 1 / 1 (maxMethodCodeSize:562; maxConstantPoolSize:145(0.22% used); numInnerClasses:0) ==\n*(1) ColumnarToRow\n+- FileScan parquet bronze.tags[userId#1613,movieId#1614,tag#1615,timestamp#1616,created_datetime#1617,created_date_day#1620,modified_datetime#1621,created_date_year#1618,created_date_month#1619] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[hdfs://hdpmaster:9000/deltalake/bronze.tags], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<userId:int,movieId:int,tag:string,timestamp:int,created_datetime:timestamp,created_date_da...\n\nGenerated code:\n/* 001 */ public Object generate(Object[] references) {\n/* 002 */   return new GeneratedIteratorForCodegenStage1(references);\n/* 003 */ }\n/* 004 */\n/* 005 */ // codegenStageId=1\n/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {\n/* 007 */   private Obj

In [19]:
s_session_delta.sql("EXPLAIN COST SELECT * FROM bronze.tags ORDER BY userId LIMIT 150;").head(200)

[Row(plan='== Optimized Logical Plan ==\nGlobalLimit 150, Statistics(sizeInBytes=10.0 KiB, rowCount=150)\n+- LocalLimit 150, Statistics(sizeInBytes=21.0 MiB)\n   +- Sort [userId#1693 ASC NULLS FIRST], true, Statistics(sizeInBytes=21.0 MiB)\n      +- Relation bronze.tags[userId#1693,movieId#1694,tag#1695,timestamp#1696,created_datetime#1697,created_date_year#1698,created_date_month#1699,created_date_day#1700,modified_datetime#1701] parquet, Statistics(sizeInBytes=21.0 MiB)\n\n== Physical Plan ==\nTakeOrderedAndProject(limit=150, orderBy=[userId#1693 ASC NULLS FIRST], output=[userId#1693,movieId#1694,tag#1695,timestamp#1696,created_datetime#1697,created_date_year#1698,created_date_month#1699,created_date_day#1700,modified_datetime#1701])\n+- *(1) ColumnarToRow\n   +- FileScan parquet bronze.tags[userId#1693,movieId#1694,tag#1695,timestamp#1696,created_datetime#1697,created_date_day#1700,modified_datetime#1701,created_date_year#1698,created_date_month#1699] Batched: true, DataFilters: [],

In [20]:
s_session_delta.sql("EXPLAIN FORMATTED SELECT * FROM bronze.tags ORDER BY userId LIMIT 150;").head(200)

[Row(plan='== Physical Plan ==\nTakeOrderedAndProject (3)\n+- * ColumnarToRow (2)\n   +- Scan parquet bronze.tags (1)\n\n\n(1) Scan parquet bronze.tags\nOutput [9]: [userId#1773, movieId#1774, tag#1775, timestamp#1776, created_datetime#1777, created_date_day#1780, modified_datetime#1781, created_date_year#1778, created_date_month#1779]\nBatched: true\nLocation: PreparedDeltaFileIndex [hdfs://hdpmaster:9000/deltalake/bronze.tags]\nReadSchema: struct<userId:int,movieId:int,tag:string,timestamp:int,created_datetime:timestamp,created_date_day:int,modified_datetime:timestamp>\n\n(2) ColumnarToRow [codegen id : 1]\nInput [9]: [userId#1773, movieId#1774, tag#1775, timestamp#1776, created_datetime#1777, created_date_day#1780, modified_datetime#1781, created_date_year#1778, created_date_month#1779]\n\n(3) TakeOrderedAndProject\nInput [9]: [userId#1773, movieId#1774, tag#1775, timestamp#1776, created_datetime#1777, created_date_day#1780, modified_datetime#1781, created_date_year#1778, created_da

### Get information about Delta objects

In [70]:
deltaTable_tags_by_path = DeltaTable.forPath(sparkSession=s_session_delta, path="hdfs://hdpmaster:9000/deltalake/bronze/tags")
deltaTable_tags_by_path.detail().head(10)

[Row(format='delta', id='772a8dd7-e548-4ec4-ad09-f1ac0b88b1c3', name=None, description='Table to store the genre for each movie.', location='hdfs://hdpmaster:9000/deltalake/bronze/tags', createdAt=datetime.datetime(2022, 10, 20, 18, 28, 18, 228000), lastModified=datetime.datetime(2022, 10, 20, 18, 29, 6, 225000), partitionColumns=['created_date_year', 'created_date_month'], numFiles=48, sizeInBytes=11238012, properties={'description': 'Table to store the genre for each movie.'}, minReaderVersion=1, minWriterVersion=4)]

In [71]:
s_session_delta.sql("DESCRIBE DETAIL 'hdfs://hdpmaster:9000/deltalake/bronze/tags'").head(10)

[Row(format='delta', id='772a8dd7-e548-4ec4-ad09-f1ac0b88b1c3', name=None, description='Table to store the genre for each movie.', location='hdfs://hdpmaster:9000/deltalake/bronze/tags', createdAt=datetime.datetime(2022, 10, 20, 18, 28, 18, 228000), lastModified=datetime.datetime(2022, 10, 20, 18, 29, 6, 225000), partitionColumns=['created_date_year', 'created_date_month'], numFiles=48, sizeInBytes=11238012, properties={'description': 'Table to store the genre for each movie.'}, minReaderVersion=1, minWriterVersion=4)]

In [72]:
deltaTable_tags_by_path.history().head(10)

[Row(version=1, timestamp=datetime.datetime(2022, 10, 20, 18, 29, 6, 225000), userId=None, userName=None, operation='WRITE', operationParameters={'mode': 'Append', 'partitionBy': '[]'}, job=None, notebook=None, clusterId=None, readVersion=0, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={'numOutputRows': '1093360', 'numOutputBytes': '11238012', 'numFiles': '48'}, userMetadata=None, engineInfo='Apache-Spark/3.3.0 Delta-Lake/2.1.0'),
 Row(version=0, timestamp=datetime.datetime(2022, 10, 20, 18, 28, 18, 275000), userId=None, userName=None, operation='CREATE TABLE', operationParameters={'description': 'Table to store the genre for each movie.', 'partitionBy': '["created_date_year","created_date_month"]', 'properties': '{"description":"Table to store the genre for each movie."}', 'isManaged': 'false'}, job=None, notebook=None, clusterId=None, readVersion=None, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={}, userMetadata=None, engineInfo='Apache-

In [75]:
s_session_delta.sql("DESCRIBE HISTORY 'hdfs://hdpmaster:9000/deltalake/bronze/tags' LIMIT 10").head(10)

[Row(version=1, timestamp=datetime.datetime(2022, 10, 20, 18, 29, 6, 225000), userId=None, userName=None, operation='WRITE', operationParameters={'mode': 'Append', 'partitionBy': '[]'}, job=None, notebook=None, clusterId=None, readVersion=0, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={'numOutputRows': '1093360', 'numOutputBytes': '11238012', 'numFiles': '48'}, userMetadata=None, engineInfo='Apache-Spark/3.3.0 Delta-Lake/2.1.0'),
 Row(version=0, timestamp=datetime.datetime(2022, 10, 20, 18, 28, 18, 275000), userId=None, userName=None, operation='CREATE TABLE', operationParameters={'description': 'Table to store the genre for each movie.', 'partitionBy': '["created_date_year","created_date_month"]', 'properties': '{"description":"Table to store the genre for each movie."}', 'isManaged': 'false'}, job=None, notebook=None, clusterId=None, readVersion=None, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={}, userMetadata=None, engineInfo='Apache-

In [76]:
deltaTable_tags_by_tablename = DeltaTable.forName(sparkSession=s_session_delta,tableOrViewName='bronze.tags')
deltaTable_tags_by_tablename.detail().head(10)

[Row(format='delta', id='772a8dd7-e548-4ec4-ad09-f1ac0b88b1c3', name=None, description='Table to store the genre for each movie.', location='hdfs://hdpmaster:9000/deltalake/bronze/tags', createdAt=datetime.datetime(2022, 10, 20, 18, 28, 18, 228000), lastModified=datetime.datetime(2022, 10, 20, 18, 29, 6, 225000), partitionColumns=['created_date_year', 'created_date_month'], numFiles=48, sizeInBytes=11238012, properties={'description': 'Table to store the genre for each movie.'}, minReaderVersion=1, minWriterVersion=4)]

In [77]:
s_session_delta.sql("DESCRIBE DETAIL bronze.tags").head(10)

[Row(format='delta', id='772a8dd7-e548-4ec4-ad09-f1ac0b88b1c3', name='bronze.tags', description='Table to store the genre for each movie.', location='hdfs://hdpmaster:9000/deltalake/bronze/tags', createdAt=datetime.datetime(2022, 10, 20, 18, 28, 18, 228000), lastModified=datetime.datetime(2022, 10, 20, 18, 29, 6, 225000), partitionColumns=['created_date_year', 'created_date_month'], numFiles=48, sizeInBytes=11238012, properties={'description': 'Table to store the genre for each movie.'}, minReaderVersion=1, minWriterVersion=4)]

In [27]:
deltaTable_tags_by_tablename.history().head(10)

[Row(version=2, timestamp=datetime.datetime(2022, 10, 20, 17, 40, 53, 170000), userId=None, userName=None, operation='WRITE', operationParameters={'mode': 'Append', 'partitionBy': '[]'}, job=None, notebook=None, clusterId=None, readVersion=1, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={'numOutputRows': '1093360', 'numOutputBytes': '11022615', 'numFiles': '36'}, userMetadata=None, engineInfo='Apache-Spark/3.3.0 Delta-Lake/2.1.0'),
 Row(version=1, timestamp=datetime.datetime(2022, 10, 20, 16, 20, 9, 834000), userId=None, userName=None, operation='WRITE', operationParameters={'mode': 'Append', 'partitionBy': '[]'}, job=None, notebook=None, clusterId=None, readVersion=0, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={'numOutputRows': '1093360', 'numOutputBytes': '11022615', 'numFiles': '36'}, userMetadata=None, engineInfo='Apache-Spark/3.3.0 Delta-Lake/2.1.0'),
 Row(version=0, timestamp=datetime.datetime(2022, 10, 20, 16, 11, 14, 399000), user

In [78]:
s_session_delta.sql("DESCRIBE HISTORY bronze.tags").head(10)

[Row(version=1, timestamp=datetime.datetime(2022, 10, 20, 18, 29, 6, 225000), userId=None, userName=None, operation='WRITE', operationParameters={'mode': 'Append', 'partitionBy': '[]'}, job=None, notebook=None, clusterId=None, readVersion=0, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={'numOutputRows': '1093360', 'numOutputBytes': '11238012', 'numFiles': '48'}, userMetadata=None, engineInfo='Apache-Spark/3.3.0 Delta-Lake/2.1.0'),
 Row(version=0, timestamp=datetime.datetime(2022, 10, 20, 18, 28, 18, 275000), userId=None, userName=None, operation='CREATE TABLE', operationParameters={'description': 'Table to store the genre for each movie.', 'partitionBy': '["created_date_year","created_date_month"]', 'properties': '{"description":"Table to store the genre for each movie."}', 'isManaged': 'false'}, job=None, notebook=None, clusterId=None, readVersion=None, isolationLevel='Serializable', isBlindAppend=True, operationMetrics={}, userMetadata=None, engineInfo='Apache-

In [79]:
s_session_delta.sql("SHOW COLUMNS IN  bronze.tags").head(10)

[Row(col_name='userId'),
 Row(col_name='movieId'),
 Row(col_name='tag'),
 Row(col_name='timestamp'),
 Row(col_name='created_datetime'),
 Row(col_name='created_date_year'),
 Row(col_name='created_date_month'),
 Row(col_name='created_date_day'),
 Row(col_name='modified_datetime')]

In [80]:
s_session_delta.sql("DESCRIBE TABLE bronze.tags").head(10)

[Row(col_name='userId', data_type='int', comment=''),
 Row(col_name='movieId', data_type='int', comment=''),
 Row(col_name='tag', data_type='string', comment='Movie genre.'),
 Row(col_name='timestamp', data_type='int', comment=''),
 Row(col_name='created_datetime', data_type='timestamp', comment=''),
 Row(col_name='created_date_year', data_type='int', comment=''),
 Row(col_name='created_date_month', data_type='int', comment=''),
 Row(col_name='created_date_day', data_type='int', comment=''),
 Row(col_name='modified_datetime', data_type='timestamp', comment=''),
 Row(col_name='', data_type='', comment='')]

In [81]:
s_session_delta.sql("DESCRIBE TABLE EXTENDED bronze.tags").head(10)

[Row(col_name='userId', data_type='int', comment=''),
 Row(col_name='movieId', data_type='int', comment=''),
 Row(col_name='tag', data_type='string', comment='Movie genre.'),
 Row(col_name='timestamp', data_type='int', comment=''),
 Row(col_name='created_datetime', data_type='timestamp', comment=''),
 Row(col_name='created_date_year', data_type='int', comment=''),
 Row(col_name='created_date_month', data_type='int', comment=''),
 Row(col_name='created_date_day', data_type='int', comment=''),
 Row(col_name='modified_datetime', data_type='timestamp', comment=''),
 Row(col_name='', data_type='', comment='')]

### Optimize table

In [82]:
deltaTable_tags_by_tablename.optimize().executeCompaction()

                                                                                

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>>]

In [41]:
#s_session_delta.sql("DROP TABLE IF EXISTS bronze.tags").head(10)

[]