# Importing Libraries

In [2]:
#imports Libs

import os
import pandas as pd
import boto3
import time
from botocore.client import ClientError

import pyarrow.parquet as pq
import s3fs
import calendar

import findspark                                              #Import library to Search for Spark Installation  

findspark.init()                                              #Search Spark Installation

import pyspark                                                #Only run after findspark.init()

from pyspark.sql import SparkSession                          #Import of Spark Session
from pyspark import SparkContext as spark                     #Import the Regular Spark Contex 
from pyspark.sql import SQLContext                            #Import the SQL Spark Contex
from pyspark.sql.functions import *
from pyspark.sql.types import *


spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext                                       #Initialize Spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1575983967584_0008,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

------------------

# Creating Variables

## Database

In [None]:
ATHENA_SPG = os.environ['SPG_DATABASE']

In [None]:
ATHENA_HANA = os.environ['GERDAU_HANA_DATABASE']

In [None]:
ATHENA_SALES = os.environ['GERDAU_SALES_DATABASE']

In [None]:
ATHENA_BW = os.environ['GERDAU_BW_DATABASE']

## Buckets

In [None]:
SPG_MANUAL_INPUT_BUCKET = os.environ['MANUAL_INPUT_BUCKET']

In [None]:
SPG_INTEGRATION_INPUT_BUCKET = os.environ['INTEGRATION_INPUT_BUCKET']

In [None]:
SPG_QUERY_BUCKET = os.environ['QUERY_BUCKET']

In [None]:
GERDAU_BUCKET = os.environ['GERDAU_BUCKET']

In [None]:
SPG_OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']

In [None]:
SPG_INPUT_BUCKET = os.environ['OUTPUT_BUCKET_FROM_OUTPUT']

## Input Paths

In [None]:
# SPG Support Tabel Calendar

SPG_INPUT_BUCKET_CALENDAR = "SPG_GLOBAL/SUPPORT/SPG_SUPPORT_CALENDARIO/tb_SPG_SUPPORT_CALENDARIO.parquet"

In [None]:
QUERY_VIEW = "SELECT billing_number, sales_item, sales_order_date, branch, sales_org_cod, billing_date, sales_number, material_cod, material_desc, gpd_cod, gpd_desc, issuing_city, issuing_state, gpm, customer_group_desc, receiving_customer_cod, receiving_customer_desc, fabricante, aprovado, quantity_ton, preco_lista, zd01, zd02, zd03, zd04, zd05, zeb1, zeb2, zeaf, zd13, zd_total_d, zd_total_p, zp30, zsu2, zp20, bx41, alcada, desvio_alcada, rbv, icms, rlv, practiced_price, preco_prat_norm, preco_prat_norm_kg, preco_politica, confirmed_ton FROM "+ATHENA_SPG+".vw_strategy_sample_v18 WHERE billing_type NOT LIKE '%Dev%' AND billing_type NOT LIKE '%Recl%' AND billing_type NOT LIKE '%Amost%'AND billing_type NOT LIKE '%Retorno%'and (sales_order_date >= '2019-01-01' or billing_date >= '2019-01-01')"

In [None]:
QUERY_PREMIUM = "SELECT * FROM "+ATHENA_SPG+".tb_spg_tg_premium where issuing_state not like '%issuing_state%'"

In [None]:
QUERY_PV = "SELECT desc_gpm,gpd_cod,state,sales_org_cod,month_year,volume FROM "+ATHENA_SPG+".tb_spg_support_pv ORDER BY month_year,desc_gpm,state,sales_org_cod;"

In [None]:
QUERY_PEX = "SELECT * FROM "+ATHENA_SPG+".tb_spg_support_pex_v2 ORDER BY month_year,gpm_cod,state,sales_org_cod;"

## Output Paths

In [None]:
SPG_OUTPUT_BUCKET_SALES = "SPG_STRATEGIC_FORUM/SF_STRATEGIC_FORUM/SPG_SF_SALES_PREV.parquet"

In [None]:
SPG_OUTPUT_BUCKET_SF = "SPG_STRATEGIC_FORUM/SF_STRATEGIC_FORUM/SPG_SF_STRATEGIC_FORUM.parquet"

## Boto3 Variables

In [None]:
#S3 Configuration
S3_ATHENA_INPUT =  's3://'+SPG_QUERY_BUCKET+'/'+SPG_QUERY_BUCKET_ATHENA

In [None]:
S3_ATHENA_OUTPUT = 's3://'+SPG_QUERY_BUCKET+'/'+SPG_QUERY_BUCKET_ATHENA

In [None]:
region_name = os.environ['AWS_REGION']

In [None]:
aws_access_key_id = os.environ['AWS_ACCESS_KEY']

In [None]:
aws_secret_access_key = os.environ['AWS_SECRET_KEY']

-------------

# Creating Defined Functions

In [None]:
# Run Query

def run_query(query, database, s3_output):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    return response

In [None]:
def get_aws_path(query,database,s3_output):
    response = run_query(query, database, s3_output)
    file_query = response['QueryExecutionId']
    file_metadata = response['QueryExecutionId'] + '.metadata'
    return file_query

In [6]:
# Wating for 300 seconds until the end of the upload

def wait_athena_load(Bucket, Key):
    time_to_wait = 300
    time_counter = 0

    while True:
        try:
            s3.meta.client.head_object(Bucket=Bucket,Key=Key)
        except ClientError:
            time.sleep(1)
            time_counter += 1
            if time_counter > time_to_wait:
                break
        else:
            break

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Replacing Characters

def normalizing_date_hifen(col_name):
    removed_array = [('-', '/')]
    r = col_name
    for a, b in removed_array:
        r = regexp_replace(r, a, b)
    return r

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
def remove_some_accents(col_name):
    removed_array = [('Á', 'A'), ('Ã', 'A'),('À', 'A'),('Â', 'A'),('Ä', 'A'),
                    ('É', 'E'),('È', 'E'),('Ê', 'E'),('Ë', 'E'),
                    ('Í', 'I'),('Ì', 'I'),('Î', 'I'),('Ï', 'I'),
                    ('Ó', 'O'),('Õ', 'O'), ('Ò', 'O'),('Ô', 'O'),('Ö', 'O'),
                    ('Ú', 'U'),('Ù', 'U'),('Û', 'U'),('Ü', 'U'),
                    ('Ç', 'C')]
    r = col_name
    for a, b in removed_array:
        r = regexp_replace(r, a, b)
    return r

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
def replace_some_space(col_name):
    removed_chars = (" ")
    regexp = "|".join('\{0}'.format(i) for i in removed_chars)
    return regexp_replace(col_name, regexp, "")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
def replace_some_apostrophe(col_name):
    removed_chars = ("'")
    regexp = "|".join('\{0}'.format(i) for i in removed_chars)
    return regexp_replace(col_name, regexp, "")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Removing hifens
def replace_some_hifen(col_name):
    removed_chars = ("-")
    regexp = "|".join('\{0}'.format(i) for i in removed_chars)
    return regexp_replace(col_name, regexp, "")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------

# Configuring Boto3 Connection

In [None]:
#Athena Client Configuration

client = boto3.client('athena', 
    aws_access_key_id = aws_access_key_id, 
    aws_secret_access_key = aws_secret_access_key, 
    region_name = region_name )

In [None]:
#S3 Resource Configuration

s3 = boto3.resource('s3',
    aws_access_key_id = aws_access_key_id,
    aws_secret_access_key = aws_secret_access_key,
    region_name = region_name)

----------------

# Importing Tables

In [None]:
# SPG Support Tabel Calendar

df_calendar = spark.read.parquet("s3a://"+SPG_INPUT_BUCKET+"/"+SPG_INPUT_BUCKET_CALENDAR)

In [None]:
df_calendar.write.partitionBy(144);
df_calendar = df_calendar.repartition(144);
df_calendar.persist(pyspark.StorageLevel.MEMORY_ONLY)

In [None]:
# Import CSV from View

athena_response = get_aws_path(QUERY_VIEW,ATHENA_SPG,S3_ATHENA_OUTPUT)

wait_athena_load(SPG_QUERY_BUCKET, SPG_QUERY_BUCKET_ATHENA+"/"+athena_response+".csv")

In [None]:
# Import CSV from View

df_view = spark.read.csv(view_path, header = 'true')

In [None]:
# Creating partition with GPM

df_view.write.partitionBy("gpm")

In [None]:
# Repartitioning by GPM

df_view = df_view.repartition("gpm")

In [None]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

In [None]:
# Import CSV from View

athena_response = get_aws_path(QUERY_PREMIUM,ATHENA_SPG,S3_ATHENA_OUTPUT)

wait_athena_load(SPG_QUERY_BUCKET, SPG_QUERY_BUCKET_ATHENA+"/"+athena_response+".csv")

In [None]:
# Import CSV from View

df_target = spark.read.csv(view_path, header = 'true')

In [None]:
# Creating partition with GPM

df_target.write.partitionBy("issuing_state")

In [None]:
# Repartitioning by GPM

df_target = df_target.repartition("issuing_state")

In [None]:
df_target.persist(pyspark.StorageLevel.MEMORY_ONLY)

In [None]:
df_target = df_target.drop("gpd_cod"
                          ,"dt_update")

In [None]:
# Import CSV from View

athena_response = get_aws_path(QUERY_PV,ATHENA_SPG,S3_ATHENA_OUTPUT)

wait_athena_load(SPG_QUERY_BUCKET, SPG_QUERY_BUCKET_ATHENA+"/"+athena_response+".csv")

In [None]:
# Import CSV from View

df_pv = spark.read.csv(view_path, header = 'true')

In [None]:
# Creating partition with GPM

df_pv.write.partitionBy("desc_gpm")

In [None]:
# Repartitioning by GPM

df_pv = df_pv.repartition("desc_gpm")

In [None]:
df_pv.persist(pyspark.StorageLevel.MEMORY_ONLY)

In [None]:
# Import CSV from View

athena_response = get_aws_path(QUERY_PEX,ATHENA_SPG,S3_ATHENA_OUTPUT)

wait_athena_load(SPG_QUERY_BUCKET, SPG_QUERY_BUCKET_ATHENA+"/"+athena_response+".csv")

In [None]:
# Import CSV from View

df_pex = spark.read.csv(view_path, header = 'true')

In [None]:
# Creating partition with GPM

df_pex.write.partitionBy("gpm_cod")

In [None]:
# Repartitioning by GPM

df_pex = df_pex.repartition("gpm_cod")

In [None]:
df_pex.persist(pyspark.StorageLevel.MEMORY_ONLY)

------------------

# Preparing Calendar

In [34]:
# Selecting Necessary Columns

df_calendar=df_calendar.select(df_calendar.CURRENT_DATE,df_calendar.CURRENT_WEEKEND)\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# Converting from String to DateTime

df_calendar=df_calendar.withColumn("CURRENT_DATE",from_unixtime(unix_timestamp(normalizing_date_hifen(trim(col("CURRENT_DATE"))), 'dd/MM/yyyy')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
# Creating Month Year Column and Filtering Business Days

df_calendar=df_calendar.withColumn("month_year", concat(month(df_calendar.CURRENT_DATE)
                                                             ,year(df_calendar.CURRENT_DATE)))\
                        .filter(df_calendar.CURRENT_WEEKEND.like('FALSE'))\
                        .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
# Counting Quantity of Business Days 

df_calendar=df_calendar.groupBy("month_year").agg(count(df_calendar.CURRENT_DATE).alias("business_days"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# Creating partition with Month Year

df_calendar.write.partitionBy("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f232473a3d0>

In [39]:
# Repartitioning by Month Year

df_calendar = df_calendar.repartition("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

------------------

# Preparing Strategy View 

In [40]:
df_view=df_view.withColumn("sales_order_date",from_unixtime(unix_timestamp(normalizing_date_hifen(trim(col("sales_order_date"))), 'yyyy/MM/dd')))\
                .withColumn("billing_date",from_unixtime(unix_timestamp(normalizing_date_hifen(trim(col("billing_date"))), 'yyyy/MM/dd')))\
                .withColumn("manufacture", when(df_view.original_manufacture.like('%Gerdau Praticado%')|
                                                df_view.original_manufacture.like('%Gerdau Carteira%'),"Gerdau Praticado")\
                                               .otherwise("Concorrentes"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
df_view=df_view.withColumn("month_year", concat(month(df_view.sales_order_date),year(df_view.sales_order_date)))\
                .withColumn("sales_month", month(df_view.sales_order_date))\
                .withColumn("sales_year", year(df_view.sales_order_date))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
df_view=df_view.join(df_calendar, on=["month_year"], how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
df_view=df_view.withColumn("key_soc_state_branch_gpd_month_year", concat(df_view.sales_org_cod
                                                                         ,df_view.issuing_state
                                                                         ,df_view.branch
                                                                         ,df_view.gpd_desc
                                                                         ,df_view.month_year))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
df_view = df_view.withColumn("key_soc_state_branch_gpd_month_year_manufacture", concat(df_view.key_soc_state_branch_gpd_month_year
                                                                                      ,df_view.manufacture))\
                .withColumn("key_state_branch_gpd_month_year_material", concat(df_view.issuing_state
                                                                               ,df_view.branch
                                                                               ,df_view.gpd_desc
                                                                               ,df_view.month_year
                                                                               ,df_view.material_cod))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
df_view = df_view.withColumn("key_soc_state_branch_gpd_month_year_material", concat(df_view.sales_org_cod
                                                                                   ,df_view.key_state_branch_gpd_month_year_material))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
df_view=df_view.drop("month_year")\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
# Converting numeric(and text) columns to float 
for col_name in ["quantity_ton"
                ,"list_price"
                ,"zd01"
                ,"zd02"
                ,"zd03"
                ,"zd04"
                ,"zd05"
                ,"zeb1"
                ,"zeb2"
                ,"zeaf"
                ,"zd13"
                ,"zp30"
                ,"zsu2"
                ,"zp20"
                ,"bx41"
                ,"verge"
                ,"rbv"
                ,"icms"
                ,"rlv"
                ,"practiced_price"
                ,"norm_pract_price_kg"
                ,"norm_pract_price"
                ,"zd_total_d"
                ,"zd_total_p"]:
    df_view = df_view.withColumn(col_name, col(col_name).cast('float'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
df_view=df_view.withColumn("rlv_per_ton", col('rlv')/col('quantity_ton'))\
               .withColumn("gross_profit", col('rbv')*0.35)\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-----

# Creating Columns with Competitors and  key_soc_state_branch_gpd_month_year_material

In [50]:
df_view_group = df_view.filter(df_view.manufacture.like('%Concorrentes%'))\
                              .groupBy("key_soc_state_branch_gpd_month_year_material")\
                              .agg(count(df_view.key_soc_state_branch_gpd_month_year_material)\
                                        .alias("count_competitor")
                                  ,avg(df_view.preco_prat_norm_kg)\
                                      .alias("competitor_price"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
df_view=df_view.join(df_view_group, on=["key_soc_state_branch_gpd_month_year_material"], how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
df_view=df_view.fillna({'count_competitor':0})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[key_soc_state_branch_gpd_month_year_material: string, billing_number: string, sales_item: string, sales_order_date: string, branch: string, sales_org_cod: string, billing_date: string, sales_number: string, material_cod: string, material_desc: string, gpd_cod: string, gpd_desc: string, issuing_city: string, issuing_state: string, gpm: string, customer_group_desc: string, receiving_customer_cod: string, receiving_customer_desc: string, fabricante: string, aprovado: string, quantity_ton: float, preco_lista: float, zd01: float, zd02: float, zd03: float, zd04: float, zd05: float, zeb1: float, zeb2: float, zeaf: float, zd13: float, zd_total_d: float, zd_total_p: float, zp30: float, zsu2: float, zp20: float, bx41: float, alcada: float, desvio_alcada: string, rbv: float, icms: float, rlv: float, practiced_price: float, preco_prat_norm: float, preco_prat_norm_kg: float, preco_politica: string, confirmed_ton: string, manufacture: string, sales_month: int, sales_year: int, business_day

--------------

# Creating Columns with Gerdau Practiced and  key_soc_state_branch_gpd_month_year_material

In [54]:
df_view_group = df_view.filter(df_view.manufacture.like('%Gerdau Praticado%'))\
                           .groupBy("key_soc_state_branch_gpd_month_year_material")\
                           .agg(avg(df_view.preco_prat_norm_kg)\
                                   .alias("gerdau_practiced_price"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
df_view=df_view.join(df_view_group, on=["key_soc_state_branch_gpd_month_year_material"], how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[key_soc_state_branch_gpd_month_year_material: string, billing_number: string, sales_item: string, sales_order_date: string, branch: string, sales_org_cod: string, billing_date: string, sales_number: string, material_cod: string, material_desc: string, gpd_cod: string, gpd_desc: string, issuing_city: string, issuing_state: string, gpm: string, customer_group_desc: string, receiving_customer_cod: string, receiving_customer_desc: string, fabricante: string, aprovado: string, quantity_ton: float, preco_lista: float, zd01: float, zd02: float, zd03: float, zd04: float, zd05: float, zeb1: float, zeb2: float, zeaf: float, zd13: float, zd_total_d: float, zd_total_p: float, zp30: float, zsu2: float, zp20: float, bx41: float, alcada: float, desvio_alcada: string, rbv: float, icms: float, rlv: float, practiced_price: float, preco_prat_norm: float, preco_prat_norm_kg: float, preco_politica: string, confirmed_ton: string, manufacture: string, sales_month: int, sales_year: int, business_day

---------------

# Creating ZD13 % Column

In [57]:
df_view=df_view.withColumn("zd13_2_p100", (col("list_price") -
                                          col("zd01") -
                                          col("zd02") -
                                          col("zd03") -
                                          col("zd04") -
                                          col("zd05") -
                                          col("zeb1") +
                                          col("zeb2") +
                                          col("zeaf") +
                                          col("zp30") +
                                          col("zsu2") +
                                          col("zp20"))*0.02)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
df_view=df_view.fillna({'zd13_2_p100':0})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------

# Creating Adjusted Politic Price

In [59]:
df_view=df_view.withColumn("adj_politic_price", when(df_view.fabricante.like("%Carteira%"), ((col("norm_pract_price")+
                                                                                            col("zd_total_d")-
                                                                                            col("zd13_2_p100"))/col("confirmed_ton"))/1000)\
                                                   .otherwise(((col("norm_pract_price")+
                                                                col("zd_total_d")-
                                                                col("zd13_2_p100"))/col("quantity_ton"))/1000))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
# Creating partition with Month Year

df_view.write.partitionBy("gpm")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f232473a410>

In [61]:
# Repartitioning by GPM

df_view = df_view.repartition("gpm")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

------------------------

# Creating Average Adjusted Politic Price

In [63]:
df_view_group = df_view.filter(df_view.manufacture.like('%Gerdau Praticado%'))\
                        .groupBy("key_soc_state_branch_gpd_month_year_material")\
                        .agg(avg(df_view.adj_politic_price)\
                                 .alias("avg_politic_price"))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
df_view_group=df_view_group.withColumn("min_politic_price", when(df_view_group.avg_politic_price==0, '')\
                                                                .otherwise(df_view_group.avg_politic_price))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
df_view=df_view.join(df_view_group, on=["key_soc_state_branch_gpd_month_year_material"], how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

------------

# Creating CG and DI Price

In [66]:
df_view_group = df_view.filter(df_view.manufacture.like('%Gerdau Praticado%'))\
                        .filter(df_view.sales_org_cod.like('%BRCG%'))\
                        .groupBy("key_state_branch_gpd_month_year_material")\
                        .agg(avg(df_view.rlv_per_ton)\
                                 .alias("brcg_price"))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
df_view=df_view.join(df_view_group, on=["key_state_branch_gpd_month_year_material"], how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
df_view_group = df_view.filter(df_view.manufacture.like('%Gerdau Praticado%'))\
                        .filter(df_view.sales_org_cod.like('%BRDI%'))\
                        .groupBy("key_state_branch_gpd_month_year_material")\
                        .agg(avg(df_view.rlv_per_ton)\
                                 .alias("brdi_price"))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
df_view=df_view.join(df_view_group, on=["key_state_branch_gpd_month_year_material"], how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[key_state_branch_gpd_month_year_material: string, key_soc_state_branch_gpd_month_year_material: string, billing_number: string, sales_item: string, sales_order_date: string, branch: string, sales_org_cod: string, billing_date: string, sales_number: string, material_cod: string, material_desc: string, gpd_cod: string, gpd_desc: string, issuing_city: string, issuing_state: string, gpm: string, customer_group_desc: string, receiving_customer_cod: string, receiving_customer_desc: string, fabricante: string, aprovado: string, quantity_ton: float, preco_lista: float, zd01: float, zd02: float, zd03: float, zd04: float, zd05: float, zeb1: float, zeb2: float, zeaf: float, zd13: float, zd_total_d: float, zd_total_p: float, zp30: float, zsu2: float, zp20: float, bx41: float, alcada: float, desvio_alcada: string, rbv: float, icms: float, rlv: float, practiced_price: float, preco_prat_norm: float, preco_prat_norm_kg: float, preco_politica: string, confirmed_ton: string, manufacture: strin

-------------------

# Creating Premium Columns

In [71]:
df_view=df_view.withColumn("practiced_premium",(col("gerdau_practiced_price")/col("competitor_price"))-1)\
                .withColumn("mark_up",(col("brcg_price")/col("brdi_price"))-1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
df_view=df_view.withColumn("practiced_premium_sample", when(col('count_competitor')>10, col('practiced_premium'))\
                                                           .otherwise(''))\
                .withColumn("politic_premium", when(col('count_competitor')<=10, (col('avg_politic_price')/col('competitor_price'))-1)\
                                                    .otherwise(''))\
                .withColumn("deviation_politic_premium", when(col('count_competitor')<=10, '')\
                                                            .otherwise((col('min_politic_price')/col('competitor_price'))-1))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
df_view_group = df_view.filter(df_view.manufacture.like('%Gerdau Praticado%'))\
                        .filter(df_view.sales_org_cod.like('%BRCG%'))\
                        .groupBy("key_state_branch_gpd_month_year_material")\
                        .agg(avg(df_view.rlv_per_ton)\
                                 .alias("brcg_price"))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
df_view=df_view.join(df_target.select(df_target.issuing_state.alias("issuing_state_drop")
                                     ,df_target.gpd_desc.alias("gpd_desc_drop")
                                     ,df_target.target_premium)
                     ,(trim(col("issuing_state"))==trim(col("issuing_state_drop"))) &
                     (trim(col("gpd_desc"))==trim(col("gpd_desc_drop"))), how="left")\
                .drop("gpd_desc_drop","issuing_state_drop")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
df_view=df_view.fillna({'target_premium':0.03})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
df_view=df_view.withColumn("premium_deviation", col("practiced_premium_sample")-(col("target_premium")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[key_state_branch_gpd_month_year_material: string, key_soc_state_branch_gpd_month_year_material: string, billing_number: string, sales_item: string, sales_order_date: string, branch: string, sales_org_cod: string, billing_date: string, sales_number: string, material_cod: string, material_desc: string, gpd_cod: string, gpd_desc: string, issuing_city: string, issuing_state: string, gpm: string, customer_group_desc: string, receiving_customer_cod: string, receiving_customer_desc: string, fabricante: string, aprovado: string, quantity_ton: float, preco_lista: float, zd01: float, zd02: float, zd03: float, zd04: float, zd05: float, zeb1: float, zeb2: float, zeaf: float, zd13: float, zd_total_d: float, zd_total_p: float, zp30: float, zsu2: float, zp20: float, bx41: float, alcada: float, desvio_alcada: string, rbv: float, icms: float, rlv: float, practiced_price: float, preco_prat_norm: float, preco_prat_norm_kg: float, preco_politica: string, confirmed_ton: string, manufacture: strin

----------

# Creating PO Column

In [78]:
df_view=df_view.withColumn("po", col("list_price") -
                                          col("zd01") -
                                          col("zd02") -
                                          col("zd03") -
                                          col("zd04") -
                                          col("zd05") -
                                          col("zeb1") +
                                          col("zeb2") +
                                          col("zeaf") +
                                          col("zp30") +
                                          col("zsu2") +
                                          col("zp20"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------

# Creating Adjusted Alçada Deviation

In [79]:
df_view=df_view.withColumn("adj_verge_deviation", when(col("zd_total_p")>col("verge"), col("po")*(col("zd_total_p")-col("verge")))\
                                                       .otherwise(lit(0)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-----------------

# Correcting Columns

In [80]:
for col_name in ["quantity_ton"
                ,"list_price"
                ,"zd01"
                ,"zd02"
                ,"zd03"
                ,"zd04"
                ,"zd05"
                ,"zeb1"
                ,"zeb2"
                ,"zeaf"
                ,"zd13"
                ,"zd_total_d"
                ,"zd_total_p"
                ,"zp30"
                ,"zsu2"
                ,"zp20"
                ,"bx41"
                ,"verge"
                ,"verge_deviation"
                ,"rbv"
                ,"icms"
                ,"rlv"
                ,"practiced_price"
                ,"norm_pract_price"
                ,"norm_pract_price_kg"
                ,"politic_price"
                ,"confirmed_ton"
                ,"rlv_per_ton"
                ,"gross_profit"
                ,"competitor_price"
                ,"gerdau_practiced_price"
                ,"brcg_price"
                ,"brdi_price"
                ,"zd13_2_p100"
                ,"adj_politic_price"
                ,"avg_politic_price"
                ,"min_politic_price"
                ,"practiced_premium"
                ,"mark_up"
                ,"practiced_premium_sample"
                ,"politic_premium"
                ,"deviation_politic_premium"
                ,"premium_deviation"
                ,"po"
                ,"adj_verge_deviation"]:
    df_view=df_view.withColumn(col_name,col(col_name).cast("float"))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
df_view=df_view.withColumn("target_premium", col("target_premium").cast("double"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------

# Repartition

In [82]:
# Creating partition with Month Year

df_view.write.partitionBy("gpm")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f23246fc5d0>

In [83]:
# Repartitioning by Month Year

df_view = df_view.repartition("gpm")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
df_view.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[key_state_branch_gpd_month_year_material: string, key_soc_state_branch_gpd_month_year_material: string, billing_number: string, sales_item: string, sales_order_date: string, branch: string, sales_org_cod: string, billing_date: string, sales_number: string, material_cod: string, material_desc: string, gpd_cod: string, gpd_desc: string, issuing_city: string, issuing_state: string, gpm: string, customer_group_desc: string, receiving_customer_cod: string, receiving_customer_desc: string, fabricante: string, aprovado: string, quantity_ton: float, preco_lista: float, zd01: float, zd02: float, zd03: float, zd04: float, zd05: float, zeb1: float, zeb2: float, zeaf: float, zd13: float, zd_total_d: float, zd_total_p: float, zp30: float, zsu2: float, zp20: float, bx41: float, alcada: float, desvio_alcada: float, rbv: float, icms: float, rlv: float, practiced_price: float, preco_prat_norm: float, preco_prat_norm_kg: float, preco_politica: float, confirmed_ton: float, manufacture: string, 

# Uploading Table

In [85]:
df_view.write.parquet("s3a://"+SPG_OUTPUT_BUCKET+"/"+SPG_OUTPUT_BUCKET_SF, mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# PEX

In [86]:
df_pex=df_pex.fillna({'gpm_cod':'NA'
                      ,'gpd_cod':'NA'
                      ,'state':'NA'
                      ,'sales_org_cod':''
                      ,'month_year':''
                      ,'volume':'0'})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
for col_name in ["gpm_cod","gpd_desc","sales_org_cod","state"]:
    df_pex=df_pex.withColumn(col_name, remove_some_accents(upper(replace_some_apostrophe(replace_some_space(col_name)))))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [88]:
df_pex = df_pex.withColumn("volume", col('volume').cast('float'))\
                .withColumn("rlv", col('rlv').cast('float'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
# create key column
df_pex = df_pex.withColumn("key_gpm_gpd_state_ov_year_month", concat(col('gpm_cod'),lit('_'),col('gpd_cod'),lit('_'),col('STATE'),lit('_'),col('SALES_ORG_COD'),lit('_'),col('MONTH_YEAR')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
df_pex = df_pex.select([col(c).alias(c+"_pex") for c in df_pex.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
df_pex.write.partitionBy("month_year_pex")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f2324726f90>

In [92]:
df_pex = df_pex.repartition("month_year_pex")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# PV

In [93]:
df_pv=df_pv.fillna({'desc_gpm':'NA'
                    ,'gpd_cod':'NA'
                    ,'state':'NA'
                    ,'sales_org_cod':''
                    ,'month_year':''
                    ,'volume':'0'})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
for col_name in ["desc_gpm","state","sales_org_cod"]:
    df_pv=df_pv.withColumn(col_name, remove_some_accents(upper(replace_some_apostrophe(replace_some_space(col_name)))))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [95]:
# create key column
df_pv = df_pv.withColumn("volume", col('volume').cast('float'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
# create key 1
df_pv = df_pv.withColumn("key_gpm_gpd_state_ov_year_month", concat(col('desc_gpm')
                                                                   ,lit('_')
                                                                   ,col('gpd_cod')
                                                                   ,lit('_')
                                                                   ,col('state')
                                                                   ,lit('_')
                                                                   ,col('sales_org_cod')
                                                                   ,lit('_')
                                                                   ,col('month_year')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [97]:
# create key 2
df_pv = df_pv.withColumn("key_gpm_gpd_state_ov", concat(col('desc_gpm')
                                                        ,lit('_')
                                                        ,col('gpd_cod')
                                                        ,lit('_')
                                                        ,col('state')
                                                        ,lit('_')
                                                        ,col('sales_org_cod')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [98]:
df_pv = df_pv.groupBy('key_gpm_gpd_state_ov_year_month'
                      ,'key_gpm_gpd_state_ov'
                      ,'month_year'
                      ,'state'
                      ,'desc_gpm'
                      ,'gpd_cod'
                      ,'sales_org_cod')\
            .agg(sum(df_pv.volume).alias("volume"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
df_pv = df_pv.withColumn("year", substring(col('month_year'),1,4).cast('int'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
df_pv = df_pv.withColumn("month", substring(col('month_year'),5,6).cast('int'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# cruzar com view para obter rlv

# Input Strategic Forum

In [101]:
# SF
df_sf = df_view.select('sales_order_date'
                       ,'issuing_state'
                       ,'sales_org_cod'
                       ,'gpm'
                       ,'gpd_cod'
                       ,'original_manufacture'
                       ,'quantity_ton'
                       ,'rbv'
                       ,'rlv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [102]:
# month_year
df_sf = df_sf.withColumn("month_year", concat(year('sales_order_date'),month('sales_order_date')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [104]:
df_sf=df_sf.withColumn("month_year", concat(year(df_sf.sales_order_date)
                                            ,when(
                                                month(df_sf.sales_order_date) >= 10, month(df_sf.sales_order_date)
                                            )
                                            .otherwise(concat(lit("0"), month(df_sf.sales_order_date)))))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [105]:
# create key column
df_sf = df_sf.withColumn("key_gpm_gpd_state_ov_year_month", concat(col('gpm'),lit('_'),col('gpd_cod'),lit('_'),col('issuing_state'),lit('_'),col('sales_org_cod'),lit('_'),col('month_year')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [106]:
df_sf= df_sf.filter(df_sf.original_manufacture.like('%Gerdau Praticado%'))\
                        .groupBy("key_gpm_gpd_state_ov_year_month","month_year")\
                        .agg(sum(df_sf.quantity_ton).alias("quantity_ton_delivered")
                            ,sum(df_sf.rbv).alias("rbv_delivered")
                            ,sum(df_sf.rlv).alias("rlv_delivered"))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [107]:
df_sf = df_sf.select([col(c).alias(c+"_sf") for c in df_sf.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [108]:
df_sf.write.partitionBy("month_year_sf")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f2324726c90>

In [109]:
df_sf = df_sf.repartition("month_year_sf")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Join Strategic Forum with PV

In [110]:
df_join_sf = df_pv.join(df_sf, df_pv.key_gpm_gpd_state_ov_year_month == df_sf.key_gpm_gpd_state_ov_year_month_sf, how="left")\
                .select('key_gpm_gpd_state_ov_year_month'
                        ,'key_gpm_gpd_state_ov'
                        ,'month_year'
                        ,'state'
                        ,'sales_org_cod'
                        ,'desc_gpm'
                        ,'gpd_cod'
                        ,'volume'
                        ,'year'
                        ,'month'
                        ,'quantity_ton_delivered_sf'
                        ,'rbv_delivered_sf'
                        ,'rlv_delivered_sf')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [111]:
df_join_sf.write.partitionBy("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f23246fcdd0>

In [112]:
df_join_sf = df_join_sf.repartition("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# PV vari

In [113]:
df_pv2 = df_join_sf.select([col(c).alias(c+"_comp") for c in df_join_sf.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [114]:
df_pv = df_join_sf.withColumn("year_calc", when(col("month") < "12", col("year")).otherwise(col("year")+1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [115]:
df_pv = df_pv.withColumn("month_calc", when(col("month") < "12", col("month")+1).otherwise(lit(1)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [116]:
df_pv.write.partitionBy("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f232470d310>

In [117]:
df_pv = df_pv.repartition("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [118]:
df_pv_join=df_pv.join(df_pv2, (df_pv.key_gpm_gpd_state_ov == df_pv2.key_gpm_gpd_state_ov_comp) & 
                              (df_pv.year_calc == df_pv2.year_comp) & 
                              (df_pv.month_calc == df_pv2.month_comp), how="left")\
                .select('key_gpm_gpd_state_ov_year_month'
                        ,'key_gpm_gpd_state_ov'
                        ,'state'
                        ,'sales_org_cod'
                        ,'desc_gpm'
                        ,'gpd_cod'
                        ,'volume'
                        ,'quantity_ton_delivered_sf'
                        ,'rbv_delivered_sf'
                        ,'rlv_delivered_sf'
                        ,'month_year'
                        ,'year'
                        ,'month'
                        ,'month_year_comp'
                        ,'volume_comp'
                        ,'quantity_ton_delivered_sf_comp'
                        ,'rbv_delivered_sf_comp'
                        ,'rlv_delivered_sf_comp')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [119]:
df_pv_join = df_pv_join.withColumn("volume_diff", (col("volume_comp") - col("volume")).cast('float'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [120]:
df_pv_join = df_pv_join.withColumn("rlv_diff", (col("rlv_delivered_sf") - col("rlv_delivered_sf_comp")).cast('float'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# calculando rlv futuro

In [122]:
df_vari_rlv_media= df_pv_join.groupBy("key_gpm_gpd_state_ov")\
                        .agg(avg(df_pv_join.rlv_diff).alias("gpm_gpd_state_ov_rlv_avg"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [123]:
df_vari_rlv_media=df_vari_rlv_media.withColumnRenamed("key_gpm_gpd_state_ov","key")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [124]:
df_pv_join=df_pv_join.join(df_vari_rlv_media, (df_pv_join.key_gpm_gpd_state_ov == df_vari_rlv_media.key), how="inner")\
                .select('key_gpm_gpd_state_ov_year_month'
                        ,'key_gpm_gpd_state_ov'
                        ,'month_year'
                        ,'year'
                        ,'month'
                        ,'state'
                        ,'sales_org_cod'
                        ,'desc_gpm'
                        ,'volume'
                        ,'quantity_ton_delivered_sf'
                        ,'rbv_delivered_sf'
                        ,'rlv_delivered_sf'
                        ,'year'
                        ,'month'
                        ,'month_year_comp'
                        ,'volume_comp'
                        ,'volume_diff'
                        ,'quantity_ton_delivered_sf_comp'
                        ,'rbv_delivered_sf_comp'
                        ,'rlv_delivered_sf_comp'
                        ,'gpm_gpd_state_ov_rlv_avg')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [125]:
df_pv_join = df_pv_join.select([col(c).alias(c+"_pv") for c in df_pv_join.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [126]:
df_pv_join.write.partitionBy("month_year_pv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f2324726650>

In [127]:
df_pv_join = df_pv_join.repartition("month_year_pv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# JOIN PEX AND PV

In [128]:
df_pex=df_pex.groupBy("gpm_cod_pex"
                      ,"gpd_cod_pex"
                      ,"gpd_desc_pex"
                      ,"sales_org_cod_pex"
                      ,"state_pex"
                      ,"month_year_pex"
                      ,"key_gpm_gpd_state_ov_year_month_pex")\
            .agg(sum("volume_pex").alias("volume_pex")
                ,sum("rlv_pex").alias("rlv_pex"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [129]:
df_join_pex_pv = df_pv_join.join(df_pex, df_pex.key_gpm_gpd_state_ov_year_month_pex == df_pv_join.key_gpm_gpd_state_ov_year_month_pv, how="inner")\
                            .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [130]:
df_join_pex_pv=df_join_pex_pv.withColumn("month_year", when(df_join_pex_pv.month_year_pex.isNull(), df_join_pex_pv.month_year_pv)\
                                                           .otherwise(df_join_pex_pv.month_year_pex))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [131]:
df_join_pex_pv=df_join_pex_pv.select(df_join_pex_pv['key_gpm_gpd_state_ov_year_month_pex'].alias("key_gpm_gpd_state_ov_year_month")
                                     ,df_join_pex_pv['key_gpm_gpd_state_ov_pv'].alias("key_gpm_gpd_state_ov")
                                     ,df_join_pex_pv['month_year']
                                     ,df_join_pex_pv['state_pex'].alias("state")
                                     ,df_join_pex_pv['sales_org_cod_pex'].alias("sales_org_cod")
                                     ,df_join_pex_pv['gpm_cod_pex'].alias("gpm_cod")
                                     ,df_join_pex_pv['gpd_cod_pex'].alias("gpd_cod")
                                     ,df_join_pex_pv['gpd_desc_pex'].alias("gpd_desc")
                                     ,df_join_pex_pv['volume_pex'].cast("float")
                                     ,df_join_pex_pv['rlv_pex'].cast("float")
                                     ,df_join_pex_pv['volume_pv'].cast("float")
                                     ,df_join_pex_pv['month_year_comp_pv']
                                     ,df_join_pex_pv['volume_comp_pv'].cast("float")
                                     ,df_join_pex_pv['quantity_ton_delivered_sf_pv'].cast("float")
                                     ,df_join_pex_pv['quantity_ton_delivered_sf_comp_pv'].cast("float")
                                     ,df_join_pex_pv['rbv_delivered_sf_comp_pv'].cast("float")
                                     ,df_join_pex_pv['rlv_delivered_sf_comp_pv'].cast("float")
                                     ,df_join_pex_pv['volume_diff_pv'].cast("float")
                                     ,df_join_pex_pv['gpm_gpd_state_ov_rlv_avg_pv'].cast("float")
                                     ,concat(col("sales_org_cod_pex")
                                             ,col("state_pex")
                                             ,col("gpd_desc_pex")
                                             ,col("month_year")).alias("key_soc_state_gpd_month_year")
                                     ,substring(col("month_year"), 1, 4).alias("year")
                                     ,substring(col("month_year"), 5, 6).alias("month"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [132]:
for col_name in ['volume_pex'
                 ,'rlv_pex'
                 ,'volume_pv'
                 ,'volume_comp_pv'
                 ,'quantity_ton_delivered_sf_pv'
                 ,'quantity_ton_delivered_sf_comp_pv'
                 ,'rbv_delivered_sf_comp_pv'
                 ,'rlv_delivered_sf_comp_pv'
                 ,'volume_diff_pv'
                 ,'gpm_gpd_state_ov_rlv_avg_pv']:
    df_join_pex_pv=df_join_pex_pv.withColumn(col_name,col(col_name).cast("float"))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [133]:
df_join_pex_pv.write.partitionBy("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.readwriter.DataFrameWriter object at 0x7f23247d1ad0>

In [134]:
df_join_pex_pv = df_join_pex_pv.repartition("month_year")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [135]:
df_join_pex_pv.persist(pyspark.StorageLevel.MEMORY_ONLY)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[key_gpm_gpd_state_ov_year_month: string, key_gpm_gpd_state_ov: string, month_year: string, state: string, sales_org_cod: string, gpm_cod: string, gpd_cod: string, gpd_desc: string, volume_pex: float, rlv_pex: float, volume_pv: float, month_year_comp_pv: string, volume_comp_pv: float, quantity_ton_entrega_sf_pv: float, quantity_ton_entrega_sf_comp_pv: float, rbv_entrega_sf_comp_pv: float, rlv_entrega_sf_comp_pv: float, volume_diff_pv: float, gpm_gpd_state_ov_rlv_avg_pv: float, key_soc_state_gpd_month_year: string, year: string, month: string]

In [136]:
df_join_pex_pv.write.parquet("s3a://"+SPG_OUTPUT_BUCKET+"/"+SPG_OUTPUT_BUCKET_SF, mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…