In [1]:
from pyspark.sql.types import StringType, DoubleType, LongType, StructField, StructType

import import_ipynb
from raw_etl import RawETL, merge

importing Jupyter notebook from raw_etl.ipynb


In [2]:
class BronzeETL(RawETL):
    
    schema = StructType([
        StructField('idx', LongType(), False),
        StructField('value', DoubleType(), False),
    ])

    def extract(self, source_table, stg_table, spark_session=None):

        df = self.spark.table(source_table)
        
        last_version = self.spark.sql(f'describe history {source_table}').toPandas().loc[0, 'version']
        
        stream = self.spark.readStream.format("delta") \
             .option('delta.enableChangeDataFeed', True) \
             .option("readChangeFeed", "true") \
             .option("startingVersion", last_version) \
             .table(source_table)
    
        return stream
    
    def transform(self, df):
        
        for c in self.schema.names:
            df = df.withColumn(c, df[c].cast(self.schema[c].dataType))
            
        return df
    
    def merge(self, batch_df, batch_id):
        # print('\n inside batch \n ')
        # display(batch_df.toPandas())
        
        batch_df = batch_df.filter("_change_type = 'update_postimage' or _change_type = 'insert'")
        
        merge(batch_df, self.target_table, self.pk, self.spark)
    
    def load(self, stream, target_table):
        
        self.target_table = target_table
    
        query = stream.writeStream \
              .format('delta') \
              .outputMode('update') \
              .foreachBatch(self.merge).start()
        
        stream.writeStream.format('delta').outputMode('append') \
              .option("checkpointLocation", f"./checkpoints/stream") \
              .toTable("stream")
        #query.awaitTermination()
        return query
        
        #merge(df, target_table, self.pk, spark_session=self.spark)
        #self.spark.sql(f"DROP TABLE {stg_table}")
        
        
    def etl(self, source_table, stg_table, target_table):
        stream = self.extract(source_table, stg_table)
        stream = self.transform(stream)
        self.load(stream, target_table)

In [3]:
if __name__ == '__main__':
    
    from pyspark.sql import SparkSession
    from delta.pip_utils import configure_spark_with_delta_pip
    
    builder = SparkSession.builder\
           .appName('raw_etl')\
           .config('spark.sql.warehouse.dir', 'pyspark_tables')\
           .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
           .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
           .config('spark.databricks.delta.retentionDurationCheck.enabled', False) \
           .config('spark.databricks.delta.schema.autoMerge.enabled', True) \
           .config('spark.databricks.delta.checkLatestSchemaOnRead', True) \
           .config('delta.enableChangeDataFeed', True)

    spark = configure_spark_with_delta_pip(builder).enableHiveSupport().getOrCreate()
    
    TARGET_TABLE = 'dummy.bronze'
    SOURCE_TABLE = 'dummy.raw'
    STG_TABLE = 'dummy.stg'
    
    etl = bronzeEtl(spark)
    etl.etl(SOURCE_TABLE, STG_TABLE, TARGET_TABLE)#.toPandas()



23/02/15 15:27:38 WARN Utils: Your hostname, spiriel resolves to a loopback address: 127.0.1.1; using 192.168.15.6 instead (on interface enp3s0)
23/02/15 15:27:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/ahow/main_env/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ahow/.ivy2/cache
The jars for the packages stored in: /home/ahow/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0743322c-9090-4cf6-a48b-bbec83858df5;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 104ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |

23/02/15 15:27:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/15 15:27:45 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


  series = series.astype(t, copy=False)


23/02/15 15:27:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-20dfb7a4-1089-4ad3-9470-a4b14efad7db. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/02/15 15:27:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.

 inside batch 
 


  series = series.astype(t, copy=False)


Unnamed: 0,idx,value,type,_change_type,_commit_version,_commit_timestamp
0,100,3.14,range,update_preimage,113,2023-02-11 04:09:03.511
1,100,0.0,range,update_postimage,113,2023-02-11 04:09:03.511


23/02/15 15:27:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [4]:
# series = spark.sql('describe history dummy.bronze').show()
# series = spark.sql('describe history dummy.bronze').toPandas()
# series = series[series['version'] == series['version'].max() ]['operationMetrics']
# for s in series:
#     for k,v in s.items():
#         print(k, v)
#     print('\n')
#spark.sql('alter table dummy.bronze add partition (idx)').show()
#spark.sql('ALTER TABLE dummy.raw SET TBLPROPERTIES (delta.enableChangeDataFeed=true)').toPandas()
#df = spark.read.table('dummy.bronze')
#display(df.toPandas())
#df.write.partitionBy('idx').format('delta').mode('overwrite').saveAsTable('dummy.bronze_')

#spark.sql('drop table dummy.bronze;').show()
#spark.sql('alter table dummy.bronze_ rename to dummy.bronze;').show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|     72|2023-02-11 04:13:...|  null|    null|    MERGE|{predicate -> (t....|null|    null|     null|         71|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|     71|2023-02-11 04:08:...|  null|    null|    MERGE|{predicate -> (t....|null|    null|     null|         70|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|     70|2

  series = series.astype(t, copy=False)


In [5]:
# spark.sql('select * from stream').toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,idx,value,type,_change_type,_commit_version,_commit_timestamp
0,100,3.14,range,update_preimage,113,2023-02-11 04:09:03.511
1,100,0.0,range,update_postimage,113,2023-02-11 04:09:03.511
2,3,0.0,asdf,delete,112,2023-02-11 04:06:06.226


In [37]:
# print(spark.sql('describe history dummy.bronze').toPandas()['operationMetrics'][0])
# spark.sql("select count(1) from dummy.bronze").show()
# spark.sql('select value from dummy.bronze where idx=3').show()
# spark.sql('select idx, count(1) c from dummy.bronze group by idx having c >1 ').show()

  series = series.astype(t, copy=False)


{'numOutputRows': '1', 'numTargetRowsInserted': '0', 'numTargetRowsUpdated': '1', 'numTargetFilesAdded': '1', 'numTargetFilesRemoved': '1', 'numTargetRowsDeleted': '0', 'scanTimeMs': '401', 'numSourceRows': '1', 'numTargetChangeFilesAdded': '0', 'executionTimeMs': '596', 'numTargetRowsCopied': '0', 'rewriteTimeMs': '181'}
+--------+
|count(1)|
+--------+
|     917|
+--------+

+-----+
|value|
+-----+
|  0.0|
+-----+

+---+---+
|idx|  c|
+---+---+
+---+---+



In [33]:
# #vals = []
# #for i in range(200, 1000):
# #    vals.append(f'({i}, 3.14, "range")')
# #spark.sql(f'insert into dummy.raw(idx, value, type) values {",".join(vals)}').show()
# #spark.sql('update dummy.raw set value=0 where idx=3').show()
# spark.sql('update dummy.raw set value=0 where idx=100').show()
# #spark.sql('select * from dummy.raw').show()
# spark.sql('describe history dummy.raw').toPandas()['operationMetrics'][0]


 inside batch 
 


  series = series.astype(t, copy=False)


Unnamed: 0,idx,value,type,_change_type,_commit_version,_commit_timestamp
0,100,3.14,range,update_preimage,113,2023-02-11 04:09:03.511
1,100,0.0,range,update_postimage,113,2023-02-11 04:09:03.511



 inside batch 
 


  series = series.astype(t, copy=False)


Unnamed: 0,idx,value,type,_change_type,_commit_version,_commit_timestamp
0,100,3.14,range,update_preimage,113,2023-02-11 04:09:03.511
1,100,0.0,range,update_postimage,113,2023-02-11 04:09:03.511



 inside batch 
 
+-----------------+
|num_affected_rows|
+-----------------+
|                1|
+-----------------+



  series = series.astype(t, copy=False)


Unnamed: 0,idx,value,type,_change_type,_commit_version,_commit_timestamp
0,100,3.14,range,update_preimage,113,2023-02-11 04:09:03.511
1,100,0.0,range,update_postimage,113,2023-02-11 04:09:03.511


  series = series.astype(t, copy=False)


23/02/11 04:09:05 ERROR MicroBatchExecution: Query [id = 89c6e5fa-bc5b-41eb-bdf0-8b267f9dd10b, runId = f29e9e28-e64e-4d03-9a05-20d93be18bf6] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/home/ahow/main_env/lib/python3.10/site-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/home/ahow/main_env/lib/python3.10/site-packages/pyspark/sql/utils.py", line 272, in call
    raise e
  File "/home/ahow/main_env/lib/python3.10/site-packages/pyspark/sql/utils.py", line 269, in call
    self.func(DataFrame(jdf, self.session), batch_id)
  File "/tmp/ipykernel_135192/2736145398.py", line 34, in merge
    merge(batch_df, self.target_table, self.pk, self.spark)
  File "<string>", line 27, in merge
  File "/home/ahow/main_env/lib/python3.10/site-packages/delta/tables.py", line 938, in execute
    self._jbuilder.execute()
  Fil

{'numAddedFiles': '1',
 'scanTimeMs': '116',
 'numCopiedRows': '0',
 'executionTimeMs': '246',
 'numAddedChangeFiles': '1',
 'numUpdatedRows': '1',
 'numRemovedFiles': '1',
 'rewriteTimeMs': '130'}

23/02/11 04:09:05 ERROR MicroBatchExecution: Query [id = 3a0f897c-1526-4036-9169-9aaaf2db2d81, runId = 23cc923c-057e-4517-850d-bef16d482d9b] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/home/ahow/main_env/lib/python3.10/site-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/home/ahow/main_env/lib/python3.10/site-packages/pyspark/sql/utils.py", line 272, in call
    raise e
  File "/home/ahow/main_env/lib/python3.10/site-packages/pyspark/sql/utils.py", line 269, in call
    self.func(DataFrame(jdf, self.session), batch_id)
  File "/tmp/ipykernel_135192/2039092094.py", line 35, in merge
    merge(batch_df, self.target_table, self.pk, self.spark)
  File "<string>", line 27, in merge
  File "/home/ahow/main_env/lib/python3.10/site-packages/delta/tables.py", line 938, in execute
    self._jbuilder.execute()
  Fil

In [11]:
# spark.sql('select count(1) from stream').show()
# spark.sql('drop table stream').show()

+--------+
|count(1)|
+--------+
|       2|
+--------+

++
||
++
++



In [12]:
# last_version = spark.sql('describe history dummy.raw').toPandas().loc[0, 'version']

# spark.read.format("delta") \
#   .option("readChangeFeed", "true") \
#   .option("startingVersion", last_version) \
#   .table("dummy.raw").count()

  series = series.astype(t, copy=False)


2