In [82]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import  col, datediff, udf, explode,to_date
from delta.tables import *

In [84]:
spark = (
    SparkSession
    .builder
    .appName("incrementa_reservas_delta")
    .config("spark.jars", "/opt/spark/jars/gcs-connector-hadoop3-latest.jar")
    # .config("spark.jars", "./spark/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar")
    #com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop3
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0,io.delta:delta-contribs_2.12:1.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.delta.logStore.gs.impl", "io.delta.storage.GCSLogStore")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    .config("fs.gs.auth.service.account.enable", "true")
    .config("fs.gs.auth.service.account.json.keyfile", "/etc/gcp/sa_credentials.json")
    .enableHiveSupport()
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

In [85]:
df = (
    spark
    .read
    .format("delta")
    .load("gs://poc_delta/stage/")
)



In [4]:
df.printSchema()

root
 |-- cod_reserva: string (nullable = true)
 |-- agencia: string (nullable = true)
 |-- dt_retirada: date (nullable = true)
 |-- cod_grupo: string (nullable = true)
 |-- dt_geracao_f: timestamp (nullable = true)
 |-- dt_devolucao: date (nullable = true)
 |-- dt_geracao: string (nullable = true)



In [87]:
spark.sql("""
          select count(cod_reserva)
          from delta.`gs://poc_delta/stage/`
          """).show()

21/11/30 19:02:08 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException


+------------------+
|count(cod_reserva)|
+------------------+
|              1282|
+------------------+





In [10]:
df = df.withColumn("qtd_diarias", datediff(col("dt_devolucao"), col("dt_retirada")))

In [11]:
df.show(5)



+-----------+-------+-----------+---------+--------------------+------------+--------------------+-----------+
|cod_reserva|agencia|dt_retirada|cod_grupo|        dt_geracao_f|dt_devolucao|          dt_geracao|qtd_diarias|
+-----------+-------+-----------+---------+--------------------+------------+--------------------+-----------+
|   19812892|    MCP| 2021-12-11|        B|2021-11-29 16:52:...|  2021-12-16|2021-11-29 16:52:...|          5|
|   61381162|    THE| 2021-12-01|       GX|2021-11-29 16:52:...|  2021-12-06|2021-11-29 16:52:...|          5|
|   99007065|    PMW| 2021-12-04|        C|2021-11-29 16:52:...|  2021-12-09|2021-11-29 16:52:...|          5|
|   75329358|    SDU| 2021-12-12|        B|2021-11-29 16:52:...|  2021-12-17|2021-11-29 16:52:...|          5|
|   21666223|    VCP| 2021-12-07|        B|2021-11-29 16:52:...|  2021-12-12|2021-11-29 16:52:...|          5|
+-----------+-------+-----------+---------+--------------------+------------+--------------------+-----------+
o



In [57]:
import datetime

dt_inicio = datetime.date(2021,1,1)
dt_fim = datetime.date(2021,1,5)
dt_aux = dt_inicio
lista_date = []

for i in range(5):
    if dt_aux == dt_inicio:
        lista_date.append(dt_inicio)
    else:
        lista_date.append(dt_aux)
        
    dt_aux = dt_aux + datetime.timedelta(days=1)

print(lista_date)

[datetime.date(2021, 1, 1), datetime.date(2021, 1, 2), datetime.date(2021, 1, 3), datetime.date(2021, 1, 4), datetime.date(2021, 1, 5)]


In [60]:
def dateList(qtd: int, data_inicio: datetime.date):
    dt_aux = data_inicio
    lista_date = []
    for i in range(qtd):
        if dt_aux == dt_inicio:
            lista_date.append(dt_inicio.strftime("%Y-%m-%d %H:%M:%S"))
            #lista_date.append(dt_inicio)
        else:
            lista_date.append(dt_aux.strftime("%Y-%m-%d %H:%M:%S"))
            #lista_date.append(dt_aux)
            
        dt_aux = dt_aux + datetime.timedelta(days=1)
    return lista_date
    

In [61]:
dateListUdf = udf(f=lambda qtd, data_inicio: dateList(qtd,data_inicio),returnType=ArrayType(StringType()))

In [62]:
df = df.withColumn("lista_datas", dateListUdf(col("qtd_diarias"), col("dt_retirada")))

In [63]:
df.show(1, vertical=True, truncate=False)



-RECORD 0-----------------------------------------------------------------------------------------------------------------
 cod_reserva  | 19812892                                                                                                  
 agencia      | MCP                                                                                                       
 dt_retirada  | 2021-12-11                                                                                                
 cod_grupo    | B                                                                                                         
 dt_geracao_f | 2021-11-29 16:52:27.448045                                                                                
 dt_devolucao | 2021-12-16                                                                                                
 dt_geracao   | 2021-11-29 16:52:27.448045                                                                                
 qtd_diarias  | 



In [64]:
df.printSchema()

root
 |-- cod_reserva: string (nullable = true)
 |-- agencia: string (nullable = true)
 |-- dt_retirada: date (nullable = true)
 |-- cod_grupo: string (nullable = true)
 |-- dt_geracao_f: timestamp (nullable = true)
 |-- dt_devolucao: date (nullable = true)
 |-- dt_geracao: string (nullable = true)
 |-- qtd_diarias: integer (nullable = true)
 |-- lista_datas: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [78]:
df_quebra = (df.select("cod_reserva","agencia","cod_grupo",explode("lista_datas"))
             .withColumnRenamed("col","dt_referencia")
             .withColumn("dt_referencia",to_date("dt_referencia","yyyy-MM-dd HH:mm:ss")))
df_quebra.printSchema()

root
 |-- cod_reserva: string (nullable = true)
 |-- agencia: string (nullable = true)
 |-- cod_grupo: string (nullable = true)
 |-- dt_referencia: date (nullable = true)



In [79]:
df_quebra.createOrReplaceTempView("reserva_quebra")

In [80]:
spark.sql("""
          select agencia, cod_grupo, dt_referencia, count(1) as qtd
          from reserva_quebra
          group by agencia, cod_grupo, dt_referencia"""
).show()




+-------+---------+-------------+---+
|agencia|cod_grupo|dt_referencia|qtd|
+-------+---------+-------------+---+
|    CGR|       GX|   2021-12-01|  1|
|    CNF|        C|   2021-12-02|  3|
|    SDU|       GX|   2021-12-06|  2|
|    CGB|        B|   2021-12-03|  1|
|    POA|        C|   2021-12-11|  6|
|    REC|        B|   2021-12-17|  2|
|    BPS|        F|   2021-12-13|  1|
|    CFB|       GC|   2021-12-13|  1|
|    CNF|       GC|   2021-12-19|  1|
|    XAP|        A|   2021-12-25|  1|
|    NAT|       FX|   2021-11-30|  1|
|    RBR|       GC|   2021-12-03|  1|
|    LDB|       GX|   2021-12-20|  1|
|    MAO|        C|   2021-12-07|  1|
|    REC|        F|   2021-12-14|  2|
|    FLN|        C|   2021-12-08|  6|
|    XAP|        C|   2021-12-09|  4|
|    MCP|       GX|   2021-12-20|  2|
|    CWB|        C|   2021-11-29|  2|
|    LDB|        C|   2021-12-18|  2|
+-------+---------+-------------+---+
only showing top 20 rows





In [81]:
spark.stop()