## Imports

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta.tables import *
from delta import *
import pyspark.sql.functions as PSF
import psycopg2
import pandas as pd
import os
import time

## Coletar as variaveis de ambiente definidas em [Variáveis de Ambiente](https://github.com/Renabouj/Tendencias-e-Desafios-em-SI-2021.2/blob/main/README.md#vari%C3%A1veis-de-ambiente).

In [2]:
env_vars = !cat ../Environment/.env
for var in env_vars:
    key, value = var.split('=')
    os.environ[key] = value

## Funcão criada a fim de salvar os dados no Banco de Dados

In [3]:
# O parâmetro OVERWRITE somente será True se já
# existir uma tabela cujo nome é o passado no 
# parâmetro DBNAME.
#
# O parâmetro APPEND será True somente se já
# existir uma tabela no banco de dados e o
# usuário queira apenas adicionar uma informação
# específica.

def save_data_to_database(df, dbname: str, overwrite: bool, append: bool):
    if overwrite:
        df.write.format("jdbc").option("driver", "org.postgresql.Driver").option("url", os.environ.get('DB_CONNECTION_STRING')) \
        .option("dbtable", dbname).option("user", os.environ.get('USER')).option("password", os.environ.get('PASSWORD')) \
        .mode("overwrite").save()
        
        return print("Overwrite done!")
    elif append:
        df.write.format("jdbc").option("driver", "org.postgresql.Driver").option("url", os.environ.get('DB_CONNECTION_STRING')) \
        .option("dbtable", dbname).option("user", os.environ.get('USER')).option("password", os.environ.get('PASSWORD')) \
        .mode("append").save()
        
        return print("Saved with append")
    else:
        df.write.format("jdbc").option("driver", "org.postgresql.Driver").option("url", os.environ.get('DB_CONNECTION_STRING')) \
        .option("dbtable", dbname).option("user", os.environ.get('USER')).option("password", os.environ.get('PASSWORD')).save()
        
        return print("Saved without overwrite!")

## Configurações do Spark

In [4]:
builder = pyspark.sql.SparkSession.builder.appName("PySpark_Postgres_test").config("spark.jars", os.environ.get('SPARKJAR')) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .master("local").appName("PySpark_Postgres_test")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

22/05/18 13:09:09 WARN Utils: Your hostname, MacBook-Pro-de-Renato.local resolves to a loopback address: 127.0.0.1; using 10.0.0.102 instead (on interface en0)
22/05/18 13:09:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/renabouj/Codes/tendenciasSI/env/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/renabouj/.ivy2/cache
The jars for the packages stored in: /Users/renabouj/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-903cfb66-a1fe-464c-ac3b-fa1fa58b06ef;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.2.1 in central
	found io.delta#delta-storage;1.2.1 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 216ms :: artifacts dl 24ms
	:: modules in use:
	io.delta#delta-core_2.12;1.2.1 from central in [default]
	io.delta#delta-storage;1.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnl

## Importar os dados encontrados no Banco de Dados

In [5]:
dataf = spark.read.format("jdbc").option("url", os.environ.get('DB_CONNECTION_STRING')) \
    .option("driver", "org.postgresql.Driver").option("dbtable", "sales") \
    .option("user", os.environ.get('USER')).option("password", os.environ.get('PASSWORD')).load()

In [6]:
dataf.where(dataf.Order_ID == 451691138).show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|              Region|  Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|Australia|     Meat|       Online|             C|  4/4/2011|451691138|5/23/2011|      4300|       421|      364|      1814127|   1568167|      245960|
+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+



                                                                                

## [Experimento 1] Experimento com 10 Repetições

### Salvar os Dados em uma Pasta Temporária

In [7]:
dataf.write.format("delta").save("/tmp/output/experimentodelta10")

                                                                                

#### Leitura dos Dados Salvos na Pasta Temporária

In [8]:
parDF = spark.read.format("delta").load("/tmp/output/experimentodelta10")
parDF.where(parDF.Order_ID == 451691138).show()

                                                                                

+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|              Region|  Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|Australia|     Meat|       Online|             C|  4/4/2011|451691138|5/23/2011|      4300|       421|      364|      1814127|   1568167|      245960|
+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+



#### Execução do Experimento

In [9]:
teste = DeltaTable.forPath(spark, "/tmp/output/experimentodelta10")
start = time.time()
for i in range(10):
    teste.update(
        condition = PSF.expr("Order_ID == 451691138"),
        set = { "Unit_Price": PSF.expr("Unit_Price + 1")
    }),
    teste.update(
        condition = PSF.expr("Order_ID == 451691138"),
        set = { "Units_Sold": PSF.expr("Units_Sold + 1")
    })
end = time.time()
print(end - start)

                                                                                

75.68113589286804


In [10]:
parDF = spark.read.format("delta").load("/tmp/output/experimentodelta10")
parDF.where(parDF.Order_ID == 451691138).show()

+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|              Region|  Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|Australia|     Meat|       Online|             C|  4/4/2011|451691138|5/23/2011|      4310|       431|      364|      1814127|   1568167|      245960|
+--------------------+---------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+



#### Salvar Todas as Informações Alteradas no Banco de Dados

In [11]:
start = time.time()
save_data_to_database(parDF, "sales", True, False)
end = time.time()
print(end - start)
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")
# Units_Sold = 4310
# Unit_Price = 431

[Stage 266:>                                                        (0 + 1) / 1]

Overwrite done!
21.101806163787842
Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138


                                                                                

#### Deletar um Order_ID e Salvar no Banco de Dados

In [12]:
deleting = DeltaTable.forPath(spark, "/tmp/output/experimentodelta10")
deleting.delete(PSF.col('Order_ID') == 451691138)

                                                                                

In [13]:
deletedDF = spark.read.format("delta").load("/tmp/output/experimentodelta10")
deletedDF.count()
# Resultado deve ser 254159.

254159

In [14]:
save_data_to_database(deletedDF, "sales", True, False) #Overwrite = True; Append = False
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")

[Stage 286:>                                                        (0 + 1) / 1]

Overwrite done!
Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138


                                                                                

#### Inserir Apenas o Conjunto de Dados Excluídos no Passo Anterior

In [15]:
check_versions = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/output/experimentodelta10")
check_versions.count()
# Resultado será 254160.

                                                                                

254160

In [16]:
df_final = check_versions.where(check_versions.Order_ID == 451691138)
df_final.count()
# Resultado será 1.

1

In [18]:
price_col = "Unit_Price"
sold_col = "Units_Sold"
price = 421
sold = 4310

In [17]:
save_data_to_database(df_final, "sales", False, True) #Overwrite = False; Append = True
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")
print(f"Ao fazer a query, os valores de {price_col} e {sold_col} serao, respectivamente, {price} e {sold}")

Saved with append
Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138


#### Retornar Todos os Dados Para a Versão Inicial e Salvar no Banco de Dados

In [None]:
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/output/experimentodelta10`")
latest_version = history.selectExpr("max(version)").collect()
print(latest_version[0][0])

In [None]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/output/experimentodelta")
df.where(df.Order_ID == 451691138).count()

In [None]:
save_data_to_database(df, "sales", True, False) #Overwrite = True; Append = False

## [Experimento 2] Experimento com 100 Repetições

#### Salvar os Dados em uma Pasta Temporária

In [19]:
dataf.write.format("delta").save("/tmp/output/experimentodelta100")

                                                                                

#### Execução do Experimento

In [20]:
teste = DeltaTable.forPath(spark, "/tmp/output/experimentodelta100")
start = time.time()
for i in range(100):
    teste.update(
        condition = PSF.expr("Order_ID == 451691138"),
        set = { "Unit_Price": PSF.expr("Unit_Price + 1")
    }),
    teste.update(
        condition = PSF.expr("Order_ID == 451691138"),
        set = { "Units_Sold": PSF.expr("Units_Sold + 1")
    })
end = time.time()
print(end - start)

                                                                                

700.8368561267853


#### Salvar as Informações Alteradas no Banco de Dados

In [22]:
parDF = spark.read.format("delta").load("/tmp/output/experimentodelta100")

In [23]:
price_value100 = 521
sold_value100 = 4400
price_col100 = "Unit_Price"
sold_col100 = "Units_Sold"

In [24]:
start = time.time()
save_data_to_database(parDF, "sales", False, True)
end = time.time()
print(end - start)
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")
print(f"Ao fazer a query, os valores de {price_col100} e {sold_col100} serao, respectivamente, {price_value100} e {sold_value100}.")
# Units_Sold = 4400
# Unit_Price = 521

[Stage 2815:>                                                       (0 + 1) / 1]

Saved with append
23.30550193786621
Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138
Ao fazer a query, os valores de Unit_Price e Units_Sold serao, respectivamente, 521 e 4400


                                                                                

#### Deletar um Order_ID e Salvar no Banco de Dados

In [25]:
deleting = DeltaTable.forPath(spark, "/tmp/output/experimentodelta100")
deleting.delete(PSF.col('Order_ID') == 451691138)

                                                                                

In [26]:
deletedDF = spark.read.format("delta").load("/tmp/output/experimentodelta100")
deletedDF.count()
# Resultado será 0

254159

In [27]:
save_data_to_database(deletedDF, "sales", True, False) #Overwrite = True; Append = False
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")

[Stage 2835:>                                                       (0 + 1) / 1]

Overwrite done!
Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138


                                                                                

#### Inserir Apenas o Conjunto de Dados Excluídos no Passo Anterior

In [28]:
check_versions = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/output/experimentodelta100")
check_versions.count()

                                                                                

254160

In [29]:
df_final = check_versions.where(check_versions.Order_ID == 451691138)
df_final.count()
# Resultado será 1.

1

In [30]:
save_data_to_database(df_final, "sales", False, True) #Overwrite = False; Append = True
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")

Saved with append
Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138


#### Retornar os Dados Para a Versão Inicial e Salvar no Banco de Dados

In [None]:
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/output/experimentodelta100`")
latest_version = history.selectExpr("max(version)").collect()
print(latest_version[0][0])

In [None]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/output/experimentodelta100")
df.where(df.Order_ID == 451691138).count()

In [None]:
save_data_to_database(df, "sales", True, False) #Overwrite = True; Append = False

## [Experimento 3] Experimento com 1000 Repetições

#### Salvar os Dados em uma Pasta Temporária

In [None]:
dataf.write.format("delta").save("/tmp/output/experimentodelta1000")

#### Execução do Experimento

In [75]:
teste = DeltaTable.forPath(spark, "/tmp/output/experimentodelta1000")
start = time.time()
for i in range(1000):
    teste.update(
        condition = PSF.expr("Order_ID == 451691138"),
        set = { "Unit_Price": PSF.expr("Unit_Price + 1")
    }),
    teste.update(
        condition = PSF.expr("Order_ID == 451691138"),
        set = { "Units_Sold": PSF.expr("Units_Sold + 1")
    })
end = time.time()
print(end - start)

                                                                                

11.589659929275513


#### Salvar as Informações Alteradas no Banco de Dados

In [None]:
parDF = spark.read.format("delta").load("/tmp/output/experimentodelta1000")

In [None]:
price_value1000 = 1421
sold_value1000 = 5300
price_col1000 = "Unit_Price"
sold_col1000 = "Units_Sold"

In [None]:
start = time.time()
save_data_to_database(parDF, "sales", False, True)
end = time.time()
print(end - start)
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")
print(f"Ao fazer a query, os valores de {price_col1000} e {sold_col1000} serao, respectivamente, {price_value1000} e {sold_value1000}.")
# Units_Sold = 5300
# Unit_Price = 1421

#### Deletar um Order_ID e Salvar no Banco de Dados

In [None]:
deleting = DeltaTable.forPath(spark, "/tmp/output/experimentodelta1000")
deleting.delete(PSF.col('Order_ID') == 451691138)

In [None]:
deletedDF = spark.read.format("delta").load("/tmp/output/experimentodelta1000")
deletedDF.count()
# Resultado será 0.

In [None]:
save_data_to_database(deletedDF, "sales", True, False) #Overwrite = True; Append = False
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")

#### Inserir Apenas o Conjunto de Dados Excluídos no Passo Anterior

In [None]:
check_versions = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/output/experimentodelta1000")
check_versions.count()

In [None]:
df_final = check_versions.where(check_versions.Order_ID == 451691138)
df_final.count()
# Resultado será 1.

In [None]:
save_data_to_database(df_final, "sales", False, True) #Overwrite = False; Append = True
print("Vá no TablePlus, aperte command + r e busque o item Order_ID == 451691138")

#### Retornar os Dados Para a Versão Inicial e Salvar no Banco de Dados

In [None]:
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/output/experimentodelta1000`")
latest_version = history.selectExpr("max(version)").collect()
print(latest_version[0][0])

In [None]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/output/experimentodelta1000")
df.where(df.Order_ID == 451691138).count()

In [None]:
save_data_to_database(df, "sales", True, False) #Overwrite = True; Append = False