### Este notebook é utilizado para o desenvolvimento do script pyspark da camada bronze

In [2]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "io.delta:delta-core_2.12:1.0.0",
        "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
        "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog",
        "--datalake-formats": "delta"
    }
}

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import json
from pyspark.sql import types
from delta import *
from pyspark.sql.functions import *
from delta.tables import *
from pyspark.sql.functions import *
from datetime import datetime

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fbc7d39cb20>

### Load dos arquivos raw data local para ingestao em delta tables

In [6]:
%%sh
ls ../../data_set

characters.csv
characters_To_Comics.csv
charcters_stats.csv
comics.csv
delta
marvel_characters_info.csv
marvel_dc_characters.csv
marvel_dc_characters.xlsx
superheroes_power_matrix.csv


In [7]:
%%sh
ls schemas/characters.json

schemas/characters.json


In [8]:
%%sh
pwd

/home/glue_user/workspace/jupyter_workspace/src/bronze_data


### Load Characters

In [9]:
nam_path='/home/glue_user/workspace/jupyter_workspace/data_set'
nam_file='characters.csv'
bol_header=True
json_path='/home/glue_user/workspace/jupyter_workspace/src/bronze_data/schemas/characters.json'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
with open(json_path,'r') as f:
    data = json.load(f)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
schema = StructType([ \
    StructField("characterID",IntegerType(),True), \
    StructField("name",StringType(),True),
  ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
schemaFromJson = StructType.fromJson(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df=spark.read.csv(f'{nam_path}/{nam_file}', header=bol_header, schema=schemaFromJson)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
desc_date=datetime.now()
desc_date=desc_date.strftime('%Y%m%d')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df=df.withColumn('dat_load', lit(desc_date))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+--------+
|characterID|                name|dat_load|
+-----------+--------------------+--------+
|    1009220|     Captain America|20230301|
|    1010740|      Winter Soldier|20230301|
|    1009471|           Nick Fury|20230301|
|    1009552|        S.H.I.E.L.D.|20230301|
|    1009228|       Sharon Carter|20230301|
|    1011109|    X-Men (Ultimate)|20230301|
|    1010808|Hawkeye (Kate Bis...|20230301|
|    1009515|            Punisher|20230301|
|    1009252|          Crossbones|20230301|
|    1009535|           Red Skull|20230301|
+-----------+--------------------+--------+
only showing top 10 rows

In [17]:
df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+--------+
|characterID|                name|dat_load|
+-----------+--------------------+--------+
|    1009220|     Captain America|20230301|
|    1010740|      Winter Soldier|20230301|
|    1009471|           Nick Fury|20230301|
|    1009552|        S.H.I.E.L.D.|20230301|
|    1009228|       Sharon Carter|20230301|
|    1011109|    X-Men (Ultimate)|20230301|
|    1010808|Hawkeye (Kate Bis...|20230301|
|    1009515|            Punisher|20230301|
|    1009252|          Crossbones|20230301|
|    1009535|           Red Skull|20230301|
+-----------+--------------------+--------+
only showing top 10 rows

In [18]:
print(df.schema.json())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"fields":[{"metadata":{},"name":"characterID","nullable":true,"type":"integer"},{"metadata":{},"name":"name","nullable":true,"type":"string"},{"metadata":{},"name":"dat_load","nullable":false,"type":"string"}],"type":"struct"}

### Uso do delta para apredenzido

In [21]:
df.write.format("delta").mode("overwrite").save(f'{nam_path}/delta/characters')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Lendo dados no formato delta

Writing files in delta tables

In [51]:
df = spark.read.format("delta").load(f'{nam_path}/delta/characters')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+--------+
|characterID|                name|dat_load|
+-----------+--------------------+--------+
|    1009220|     Captain America|20230209|
|    1010740|      Winter Soldier|20230209|
|    1009471|           Nick Fury|20230209|
|    1009552|        S.H.I.E.L.D.|20230209|
|    1009228|       Sharon Carter|20230209|
|    1011109|    X-Men (Ultimate)|20230209|
|    1010808|Hawkeye (Kate Bis...|20230209|
|    1009515|            Punisher|20230209|
|    1009252|          Crossbones|20230209|
|    1009535|           Red Skull|20230209|
+-----------+--------------------+--------+
only showing top 10 rows

In [21]:
df=df.filter(df['characterID'].isin(1009220,1009471))

In [22]:
df.show(10)

+-----------+---------------+
|characterID|           name|
+-----------+---------------+
|    1009220|Captain America|
|    1009471|      Nick Fury|
+-----------+---------------+



In [53]:
%%sh
aws s3 ls


An error occurred (RequestTimeTooSkewed) when calling the ListBuckets operation: The difference between the request time and the current time is too large.


CalledProcessError: Command 'b'aws s3 ls\n'' returned non-zero exit status 255.

### Overwrite

In [18]:
df.write.format("delta").mode("overwrite").save(f'{nam_path}/delta/characters')

In [24]:
df = spark.read.format("delta").load(f'{nam_path}/delta/characters')

In [25]:
df.show(10)

+-----------+---------------+
|characterID|           name|
+-----------+---------------+
|    1009220|Captain America|
|    1009471|      Nick Fury|
+-----------+---------------+



### Update

In [21]:
deltaTable = DeltaTable.forPath(spark, f'{nam_path}/delta/characters')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
`..//..//data_set/delta/characters` is not a Delta table.
Traceback (most recent call last):
  File "/tmp/spark-d6152a16-d250-43b4-abff-65892274b8f0/userFiles-4b6cfa09-d967-446a-8312-72233aec7cd9/io.delta_delta-core_2.12-1.0.0.jar/delta/tables.py", line 292, in forPath
    jdt = sparkSession._sc._jvm.io.delta.tables.DeltaTable.forPath(
  File "/home/glue_user/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/home/glue_user/spark/python/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: `..//..//data_set/delta/characters` is not a Delta table.



In [27]:
deltaTable.update("characterID = 1009220", 
            { "name":"'Captain America___'",} )

In [28]:
deltaTable.toDF().orderBy("characterID").show(truncate=False)

+-----------+------------------+
|characterID|name              |
+-----------+------------------+
|1009220    |Captain America___|
|1009471    |Nick Fury         |
+-----------+------------------+



### Delete

In [29]:
deltaTable = DeltaTable.forPath(spark, f'{nam_path}/delta/characters')
deltaTable.delete("characterID=1009220")

In [30]:
deltaTable.toDF().show(10)

+-----------+---------+
|characterID|     name|
+-----------+---------+
|    1009471|Nick Fury|
+-----------+---------+



### Merge

In [31]:
df_tmp=spark.read.csv(f'{nam_path}/{nam_file}', header=bol_header, schema=schemaFromJson)

In [32]:
df_tmp.show(10)

+-----------+--------------------+
|characterID|                name|
+-----------+--------------------+
|    1009220|     Captain America|
|    1010740|      Winter Soldier|
|    1009471|           Nick Fury|
|    1009552|        S.H.I.E.L.D.|
|    1009228|       Sharon Carter|
|    1011109|    X-Men (Ultimate)|
|    1010808|Hawkeye (Kate Bis...|
|    1009515|            Punisher|
|    1009252|          Crossbones|
|    1009535|           Red Skull|
+-----------+--------------------+
only showing top 10 rows



In [33]:
deltaTable = DeltaTable.forPath(spark, f'{nam_path}/delta/characters')

In [34]:
deltaTable.toDF().show(10)

+-----------+---------+
|characterID|     name|
+-----------+---------+
|    1009471|Nick Fury|
+-----------+---------+



In [39]:
deltaTable.alias('characters') \
  .merge( \
    df_tmp.alias('characters_tmp'), \
    'characters.characterID = characters_tmp.characterID' \
  ).whenMatchedUpdateAll() \
   .whenNotMatchedInsertAll() \
   .execute()

https://docs.delta.io/latest/delta-update.html#language-python

In [40]:
deltaTable = DeltaTable.forPath(spark, f'{nam_path}/delta/characters')

In [41]:
deltaTable=deltaTable.toDF()

In [42]:
deltaTable.filter(deltaTable['characterID']==1009471).show(10)

+-----------+---------+
|characterID|     name|
+-----------+---------+
|    1009471|Nick Fury|
+-----------+---------+



In [43]:
deltaTable.show()

+-----------+--------------------+
|characterID|                name|
+-----------+--------------------+
|    1009144|              A.I.M.|
|    1009146|Abomination (Emil...|
|    1009148|       Absorbing Man|
|    1009149|               Abyss|
|    1009150|          Agent Zero|
|    1009151|               Amiko|
|    1009152|         Ancient One|
|    1009153|Angel (Warren Wor...|
|    1009154|           Annihilus|
|    1009156|          Apocalypse|
|    1009157|Spider-Girl (Anya...|
|    1009158|              Arcade|
|    1009159|           Archangel|
|    1009160|            Arclight|
|    1009163|              Aurora|
|    1009164|           Avalanche|
|    1009165|            Avengers|
|    1009167|        Bruce Banner|
|    1009168|             Banshee|
|    1009169|      Baron Strucker|
+-----------+--------------------+
only showing top 20 rows



In [99]:
table = DeltaTable.forPath(spark, f'{nam_path}/delta/characters')

In [100]:
table.vacuum(1000)

DataFrame[]

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import json
from pyspark.sql import types
from delta import *
from pyspark.sql.functions import *
from delta.tables import *
from pyspark.sql.functions import *
from datetime import datetime

class ProcessBronzeData:

    def __init__(self, nam_file, path_source=None, path_target=None, ):
        self.path_source = path_source
        self.path_target = path_target
        self.nam_file = nam_file

    def spark_session(self):
        builder = SparkSession.builder.appName("MyApp") \
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

        spark = configure_spark_with_delta_pip(builder).getOrCreate()
        
        return spark

    def load_schema_file(self,nam_path):
        with open(f"{json_path}/{self.nam_file}.json",'r') as f:
            data = json.load(f)
        schemaFromJson = StructType.fromJson(data)

        return schemaFromJson

    def load_bronze_data_csv(self, spark, schema, header=True, pk='ID'):
        
        df=spark.read.csv(f'{self.path_source}/{self.nam_file}.csv', header=header, schema=schema)
        desc_date=datetime.now()
        desc_date=desc_date.strftime('%Y%m%d')
        df=df.withColumn('dat_load', lit(desc_date))
        
        if (a>0):
            df.write.format("delta").mode("overwrite").partitionBy("dat_load").save(f'{self.path_target}/delta/{self.nam_file}')
        else:
            deltaTable.alias(f"{self.nam_file}") \
              .merge( \
                df_tmp.alias(f"{self.nam_file}_tmp"), \
                f'{self.nam_file}.{pk} = {self.nam_file}_tmp' \
              ).whenMatchedUpdateAll() \
               .whenNotMatchedInsertAll() \
               .execute()
            
        delta_table.vacuum()


if __name__ == '__main__':
    nam_path='..//..//data_set'
    nam_file='characters'
    json_path='schemas'

    bronze_data=ProcessBronzeData(nam_file, nam_path, nam_path)
    spark=bronze_data.spark_session()
    schema=bronze_data.load_schema_file(json_path)
    bronze_data.load_bronze_data_csv(spark, schema)


In [47]:
delta_merge = DeltaTable.forPath(spark, f'{nam_path}/delta/characters')

AnalysisException: `..//..//data_set/delta/characters` is not a Delta table.

In [44]:
delta_merge=delta_merge.toDF()

In [46]:
delta_merge.count()

1170

In [22]:
%%sql


UsageError: %%sql is a cell magic, but the cell body is empty.
