In [None]:
!pip install azure-datalake-store

In [1]:
from azure.datalake.store import core, lib, multithread


In [2]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import StringType,StructField, StructType  
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
            .master("local[8]") \
            .appName("airflow_app") \
            .config('spark.executor.memory', '16g') \
            .config('spark.driver.memory', '16g') \
            .config('spark.sql.execution.pandas.respectSessionTimeZone', False) \
            .config("spark.driver.maxResultSize", "2048MB") \
            .config("spark.port.maxRetries", "100") \
            .config("spark.sql.execution.arrow.enabled", "true") \
            .getOrCreate()

In [6]:
adlCreds = lib.auth(url_suffix='raizenprd01', resource='https://datalake.azure.net/')

To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code BKGYTJMRJ to authenticate.


In [7]:
spark.conf.set("fs.adl.oauth2.access.token.provider.type", "RefreshToken")
spark.conf.set("fs.adl.oauth2.client.id", adlCreds.token['client'])
spark.conf.set("fs.adl.oauth2.refresh.token", adlCreds.token['refreshToken'])

In [8]:
coef_df = spark.read.format('csv').option('header',True).option('sep',';').load('adl://raizenprd01.azuredatalakestore.net/ldt_dev/sandbox/previsao_demanda/static/TxConvGasolEq.csv')

In [9]:
coef_df = coef_df.select(
        F.to_date(coef_df.inicio,"dd/MM/yyyy").alias('Inicio'),
        F.to_date(coef_df.Fim,"dd/MM/yyyy").alias('Fim'),
        F.regexp_replace(coef_df.TxConversao,",",".").cast("double").alias('TxConversao')
    )


In [10]:
volume_model = spark.read.parquet('adl://raizenprd01.azuredatalakestore.net/ldt_dev/sandbox/previsao_demanda/07_model_input/volume_historico')

In [11]:
volume_historico = spark.read.parquet('adl://raizenprd01.azuredatalakestore.net/ldt_dev/sandbox/previsao_demanda/06_mastertable/master')

In [16]:
volume_historico.show()

+-------------------+----+-------+--------+-----------+---------------+---+
|               Date|Base| Volume|Segmento|    Produto|         Cidade| UF|
+-------------------+----+-------+--------+-----------+---------------+---+
|2012-06-28 00:00:00|BEST| 7000.0|       V|   Gasolina|   PORTO ALEGRE| RS|
|2012-06-28 00:00:00|BEST| 7000.0|       V|   Gasolina|      CRUZ ALTA| RS|
|2012-06-28 00:00:00|BEST| 3000.0|       V|   Gasolina|   CACHOEIRINHA| RS|
|2012-06-28 00:00:00|BEST| 5000.0|       V|   Gasolina|   PORTO ALEGRE| RS|
|2012-06-28 00:00:00|BEST| 5000.0|       V|Diesel S500|   PORTO ALEGRE| RS|
|2012-06-28 00:00:00|BEST| 5000.0|       V|Diesel S500|  NOVO HAMBURGO| RS|
|2012-06-28 00:00:00|BEST|31000.0|       V|Diesel S500|TRES CACHOEIRAS| RS|
|2012-06-28 00:00:00|BEST| 5000.0|       V|Diesel S500|         VIAMAO| RS|
|2012-06-28 00:00:00|BEST| 5000.0|       V|Diesel S500|   CACHOEIRINHA| RS|
|2012-06-28 00:00:00|BEST| 5000.0|       V|Diesel S500|   PORTO ALEGRE| RS|
|2012-06-28 

In [12]:
vol_hist_uf = volume_historico \
                .groupBy('Date', 'Segmento', 'Produto', 'UF').agg({"Volume":"sum"}) \
                .withColumnRenamed('sum(Volume)','Volume_UF') \
                .withColumn('Date', F.to_date(volume_historico.Date,"dd/MM/yyyy"))

In [14]:
## (Coef*Etanol) + Gasolina = Gasolina Equivalente
df_gasolina_eq = vol_hist_uf.where(((F.col("Segmento") == F.lit("V")) & F.col("Produto").isin({"Etanol", "Gasolina",})))
df_gasolina_eq = df_gasolina_eq.groupBy("Date","Segmento", "UF").pivot("Produto").sum("Volume_UF")
df_gasolina_eq = df_gasolina_eq.join(coef_df,[(F.col("Date") > F.col("Inicio")) & (F.col("Date") < F.col("Fim"))],"inner").select('Date','Segmento','Etanol','Gasolina','TxConversao')
df_gasolina_eq = df_gasolina_eq.withColumn('GasolinaEq',F.col('Etanol')*F.col('TxConversao')+F.col('Gasolina'))
df_gasolina_eq = df_gasolina_eq.selectExpr("Date","Segmento","stack(1, 'Gasolina', GasolinaEq) as (Produto, Volume_UF)")

In [16]:
#Diesel Equivalente - Varejo Soma Diesel S10 + Diesel S500
df_s10 = vol_hist_uf.where(((F.col("Segmento") == F.lit("V")) & F.col("Produto").isin({"Diesel S10", "Diesel S500"})))
df_s10 = df_s10.groupBy("Date","Segmento").pivot("Produto").sum("Volume_UF") \
.withColumn('DieselEqVarejo',F.col('Diesel S10')+F.col('Diesel S500'))
df_s10 = df_s10.selectExpr("Date","Segmento","stack(1, 'Diesel', DieselEqVarejo) as (Produto,Volume_UF)")

In [18]:
#Diesel Equivalente - B2B Soma Diesel S10 + Diesel S500
df_b2b = vol_hist_uf.where(((F.col("Segmento") == F.lit("C")) & F.col("Produto").isin({"Diesel S10", "Diesel S500"})))
df_b2b = df_b2b.groupBy("Date","Segmento").pivot("Produto").sum("Volume_UF")\
.withColumn('DieselEqB2B',F.col('Diesel S10')+F.col('Diesel S500'))
df_b2b = df_b2b.selectExpr("Date","Segmento","stack(1, 'Diesel', DieselEqB2B) as (Produto,Volume_UF)")

In [25]:
volume_uf = df_gasolina_eq.union(df_s10).union(df_b2b)

In [10]:
# vol_hist_uf = volume_historico\
#                 .groupBy('Mes', 'Ano', 'UF', 'Segmento','Produto').agg({"Volume":"sum"}).withColumnRenamed('sum(Volume)','Volume_UF')

In [26]:
volume_br = volume_historico.groupBy('Date', 'Segmento', 'Produto').agg({"Volume":"sum"}) \
        .withColumnRenamed('Date', 'YearMonth_Brasil') \
        .withColumnRenamed('Segmento', 'Segmento_Brasil') \
        .withColumnRenamed('Produto', 'Produto_Brasil') \
        .withColumnRenamed('sum(Volume)','Volume_Brasil')
        

In [27]:
volume_br = volume_br.withColumn('YearMonth_Brasil', F.to_date(volume_br.YearMonth_Brasil,"dd/MM/yyyy"))

In [28]:
rateio = volume_uf.join(volume_br, (volume_uf.Date == volume_br.YearMonth_Brasil)  &   (volume_uf.Segmento == volume_br.Segmento_Brasil) & (volume_uf.Produto == volume_br.Produto_Brasil))

In [32]:
rateio = rateio.withColumn('Rateio',F.col('Volume_UF')/F.col('Volume_Brasil')) 

In [33]:
rateio.show(10)

Py4JJavaError: An error occurred while calling o672.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 29.0 failed 1 times, most recent failure: Lost task 3.0 in stage 29.0 (TID 2269, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 212295 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:274)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [81]:
df_peso = spark.read.format('csv').option('header', True).option('sep',';').load('adl://raizenprd01.azuredatalakestore.net/ldt_dev/sandbox/previsao_demanda/static/peso_vendas_calendario.csv')

In [82]:
df_peso = df_peso.groupBy('Date', 'UF', 'Produto', 'Segmento').agg({"Peso": "sum"})

In [83]:
df_peso.show()

+---------+---+-----------+--------+-----------------+
|     Date| UF|    Produto|Segmento|        sum(Peso)|
+---------+---+-----------+--------+-----------------+
| 3-Apr-12| MA|     Etanol|       V| 1.00429408716469|
| 3-Apr-12| PI|   Gasolina|       V|0.883433977878056|
| 3-Apr-12| RS|Diesel S500|       C| 0.97929933399968|
| 4-Apr-12| MS|     Etanol|       V|0.820835511226811|
| 5-Apr-12| CE| Diesel S10|       C|0.869423815634516|
| 6-Apr-12| SC|Diesel S500|       C| 1.03348273093332|
|10-Apr-12| RO|   Gasolina|       V|0.861782210598827|
|16-Apr-12| RO|   Gasolina|       V| 1.02174061530452|
|17-Apr-12| SP|Diesel S500|       V|0.933976224869288|
|25-Apr-12| MG|     Etanol|       V|0.923999701327453|
|26-Apr-12| PB|     Etanol|       V|0.914238222354483|
|27-Apr-12| AL|   Gasolina|       V| 1.07002186387463|
| 3-May-12| RJ|     Etanol|       V|0.866675327187963|
| 3-May-12| RO|     Etanol|       V|0.735084985683681|
|12-May-12| SC|     Etanol|       V|0.556862915183844|
|12-May-12

In [90]:
df_peso = df_peso.withColumn("Date",(F.regexp_replace(df_peso.Date,"-",""))) \
        .withColumn("Date", F.to_date(F.col("Date"), "dd.MM.yyyy"))
            

In [91]:
df_peso.show(3)

+----+---+-----------+--------+-----------------+
|Date| UF|    Produto|Segmento|        sum(Peso)|
+----+---+-----------+--------+-----------------+
|null| MA|     Etanol|       V| 1.00429408716469|
|null| PI|   Gasolina|       V|0.883433977878056|
|null| RS|Diesel S500|       C| 0.97929933399968|
+----+---+-----------+--------+-----------------+
only showing top 3 rows



In [51]:
df_peso = df_peso.withColumn("Dia",F.date_format(F.col('Date'),"dd")) \
        .withColumn("Mes",F.date_format(F.col('Date'),"MM")) \
        .withColumn("Ano",F.date_format(F.col('Date'),"YYYY"))
                
df_peso.show()

+---------+---+-----------+--------+-----------------+----+----+----+
|     Date| UF|    Produto|Segmento|        sum(Peso)| Ano| Mes| Dia|
+---------+---+-----------+--------+-----------------+----+----+----+
| 3-Apr-12| MA|     Etanol|       V| 1.00429408716469|null|null|null|
| 3-Apr-12| PI|   Gasolina|       V|0.883433977878056|null|null|null|
| 3-Apr-12| RS|Diesel S500|       C| 0.97929933399968|null|null|null|
| 4-Apr-12| MS|     Etanol|       V|0.820835511226811|null|null|null|
| 5-Apr-12| CE| Diesel S10|       C|0.869423815634516|null|null|null|
| 6-Apr-12| SC|Diesel S500|       C| 1.03348273093332|null|null|null|
|10-Apr-12| RO|   Gasolina|       V|0.861782210598827|null|null|null|
|16-Apr-12| RO|   Gasolina|       V| 1.02174061530452|null|null|null|
|17-Apr-12| SP|Diesel S500|       V|0.933976224869288|null|null|null|
|25-Apr-12| MG|     Etanol|       V|0.923999701327453|null|null|null|
|26-Apr-12| PB|     Etanol|       V|0.914238222354483|null|null|null|
|27-Apr-12| AL|   Ga

In [22]:
df_varejo = df_peso.where(((F.col("Segmento") == F.lit("V")) & F.col("Produto").isin({"Etanol", "Gasolina",})))
df_varejo.show()

+---------+---+--------+--------+-----------------+
|     Date| UF| Produto|Segmento|        sum(Peso)|
+---------+---+--------+--------+-----------------+
| 3-Apr-12| MA|  Etanol|       V| 1.00429408716469|
| 3-Apr-12| PI|Gasolina|       V|0.883433977878056|
| 4-Apr-12| MS|  Etanol|       V|0.820835511226811|
|10-Apr-12| RO|Gasolina|       V|0.861782210598827|
|16-Apr-12| RO|Gasolina|       V| 1.02174061530452|
|25-Apr-12| MG|  Etanol|       V|0.923999701327453|
|26-Apr-12| PB|  Etanol|       V|0.914238222354483|
|27-Apr-12| AL|Gasolina|       V| 1.07002186387463|
| 3-May-12| RJ|  Etanol|       V|0.866675327187963|
| 3-May-12| RO|  Etanol|       V|0.735084985683681|
|12-May-12| SC|  Etanol|       V|0.556862915183844|
|12-May-12| SE|  Etanol|       V|0.790764219093153|
|18-May-12| PI|  Etanol|       V|0.985815135185125|
|19-May-12| PR|Gasolina|       V|0.859426571418272|
|19-May-12| SE|  Etanol|       V|0.790764219093153|
|21-May-12| DF|  Etanol|       V|0.837807055001337|
|22-May-12| 

In [14]:
df_varejo_diesel = df.where(((F.col("Segmento") == F.lit("V")) & F.col("Produto").isin({"Diesel S10", "Diesel S500"})))
df_varejo_diesel.show()

+--------+---+-----------+--------+-----------------+
|    Date| UF|    Produto|Segmento|             Peso|
+--------+---+-----------+--------+-----------------+
|1-Apr-12| AL|Diesel S500|       V|0.000793939173022|
|1-Apr-12| BA|Diesel S500|       V|0.165102927097304|
|1-Apr-12| CE| Diesel S10|       V|0.001777721194909|
|1-Apr-12| ES|Diesel S500|       V| 0.03087335402366|
|1-Apr-12| MA|Diesel S500|       V|0.005326310031586|
|1-Apr-12| MG|Diesel S500|       V| 0.14445075784205|
|1-Apr-12| PA| Diesel S10|       V|0.002458633759597|
|1-Apr-12| PE| Diesel S10|       V|0.029104660549066|
|1-Apr-12| PI|Diesel S500|       V|0.005386584525703|
|1-Apr-12| PR|Diesel S500|       V|0.154093017400943|
|1-Apr-12| RJ|Diesel S500|       V|0.109358808269283|
|1-Apr-12| RS|Diesel S500|       V|0.002032309504466|
|1-Apr-12| SE|Diesel S500|       V|0.073806739925784|
|1-Apr-12| SP|Diesel S500|       V|0.082311557358312|
|2-Apr-12| AL|Diesel S500|       V| 1.03098161415133|
|2-Apr-12| BA|Diesel S500|  

In [15]:
df_b2b = df.where(((F.col("Segmento") == F.lit("C")) & F.col("Produto").isin({"Diesel S10", "Diesel S500"})))
df_b2b.show()

+--------+---+-----------+--------+-----------------+
|    Date| UF|    Produto|Segmento|             Peso|
+--------+---+-----------+--------+-----------------+
|1-Apr-12| AL|Diesel S500|       C|                0|
|1-Apr-12| BA|Diesel S500|       C|0.106229853387477|
|1-Apr-12| CE| Diesel S10|       C|0.030886005500449|
|1-Apr-12| ES|Diesel S500|       C|0.000179199636193|
|1-Apr-12| MA|Diesel S500|       C|0.027962273990326|
|1-Apr-12| MG| Diesel S10|       C|0.050711028668165|
|1-Apr-12| MG|Diesel S500|       C|0.126477653119571|
|1-Apr-12| PA| Diesel S10|       C|0.005766407356846|
|1-Apr-12| PE| Diesel S10|       C|0.001318184493358|
|1-Apr-12| PI|Diesel S500|       C|                0|
|1-Apr-12| PR| Diesel S10|       C|0.103676433434961|
|1-Apr-12| PR|Diesel S500|       C|0.205606122161058|
|1-Apr-12| RJ| Diesel S10|       C|0.035321976717713|
|1-Apr-12| RJ|Diesel S500|       C| 0.00844572208862|
|1-Apr-12| RS| Diesel S10|       C|                0|
|1-Apr-12| RS|Diesel S500|  

DataFrame[Date: string, UF: string, Produto: string, Segmento: string, sum(Peso): double]

+--------+---+-----------+--------+-----------------+
|    Date| UF|    Produto|Segmento|             Peso|
+--------+---+-----------+--------+-----------------+
|1-Apr-12| AC|     Etanol|       V|                0|
|1-Apr-12| AC|   Gasolina|       V|                0|
|1-Apr-12| AL|Diesel S500|       C|                0|
|1-Apr-12| AL|Diesel S500|       V|0.000793939173022|
|1-Apr-12| AL|     Etanol|       V|0.001380148385852|
|1-Apr-12| AL|   Gasolina|       V|0.001043360657431|
|1-Apr-12| AM|     Etanol|       V|0.012257468537506|
|1-Apr-12| AM|   Gasolina|       V|0.005977460471064|
|1-Apr-12| BA|Diesel S500|       C|0.106229853387477|
|1-Apr-12| BA|Diesel S500|       V|0.165102927097304|
|1-Apr-12| BA|     Etanol|       V|0.145752385200027|
|1-Apr-12| BA|   Gasolina|       V|0.162125998225635|
|1-Apr-12| CE| Diesel S10|       C|0.030886005500449|
|1-Apr-12| CE| Diesel S10|       V|0.001777721194909|
|1-Apr-12| CE|     Etanol|       V|0.001916009334648|
|1-Apr-12| CE|   Gasolina|  

AttributeError: 'DataFrame' object has no attribute 'cast'

In [59]:
df_varejo = df.groupBy('Date', 'UF', 'Produto').pivot('Segmento').sum('Peso')

AnalysisException: '"Peso" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

In [43]:
df.groupBy('Date', 'Produto', 'Segmento').pivot('UF').sum('Peso')

AnalysisException: 'Cannot resolve column name "Peso" among (Date, UF, Produto, Segmento, sum(Peso));'

In [31]:
df = df.where(((F.col("Segmento") == F.lit("V")) & F.col("Produto").isin({"Etanol", "Gasolina",})))

In [32]:
df.show(10)

+--------+---+--------+--------+-----------------+
|    Date| UF| Produto|Segmento|             Peso|
+--------+---+--------+--------+-----------------+
|1-Apr-12| AC|  Etanol|       V|                0|
|1-Apr-12| AC|Gasolina|       V|                0|
|1-Apr-12| AL|  Etanol|       V|0.001380148385852|
|1-Apr-12| AL|Gasolina|       V|0.001043360657431|
|1-Apr-12| AM|  Etanol|       V|0.012257468537506|
|1-Apr-12| AM|Gasolina|       V|0.005977460471064|
|1-Apr-12| BA|  Etanol|       V|0.145752385200027|
|1-Apr-12| BA|Gasolina|       V|0.162125998225635|
|1-Apr-12| CE|  Etanol|       V|0.001916009334648|
|1-Apr-12| CE|Gasolina|       V|0.002980799554151|
+--------+---+--------+--------+-----------------+
only showing top 10 rows

