In [49]:
import os
import sys
import itertools
import warnings 

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pyspark.sql.functions as f
import subprocess
import datetime
from pyspark.sql.window import Window


In [28]:
from pyspark.sql import HiveContext
hive = HiveContext(sc) 
hive.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

In [29]:
RUN_DATE = datetime.date.today()
print(RUN_DATE)

2020-09-23


# Load updated transaction data directly from source

In [28]:
hist_trx = "s3://formation-samba-us-west-2-seabright-samba-sources/transaction/historical/load_dt=2020-09-15"
hist_trx = spark.read.csv(hist_trx,header=True)


In [29]:
hist_trx =hist_trx.withColumn('load_date',f.lit('2020-09-15').cast('date'))

In [30]:
hist_trx.take(1)

[Row(id_customer='000159ba66919ddf7c703d1b0ff79fb2', clube=None, clube_plano=None, tier='Ouro', alias_name='G3', data_processamento='2017-09-19T00:00', data_transacao='2017-09-01T00:00', safra='201709', id_transacao='1-109DS6BM', nome_parceiro='GOL LINHAS AEREAS', tipo_parceiro='GOL', tipo_produto='CREDITO RETROATIVO', tipo_milhas='Milhas', data_voo='2017-09-01T00:00', qtd_bilhetes=None, nr_voo='1603', pnr='AMHBRK', od='BPSROS', aeroporto_origem='BPS', continente_origem=None, aeroporto_destino='ROS', continente_destino=None, tipo_viagem=None, teto_diamante=None, viaje_facil=None, voucher='0', assento='0', bagagem='0', classe=None, tp_tarifa=None, parte_milhas=None, parte_money=None, valor_money=None, volume_milhas='2209', canal_operacao='DEMAIS CANAIS', tp_transacao='ACCRUAL', load_date=datetime.date(2020, 9, 15))]

In [33]:
hist_trx.select(f.to_timestamp('data_transacao').cast('date').alias('yoo')).distinct().sort(f.col('yoo').desc()).show()

+----------+
|       yoo|
+----------+
|2020-09-07|
|2020-09-06|
|2020-09-05|
|2020-09-04|
|2020-09-03|
|2020-09-02|
|2020-09-01|
|2020-08-31|
|2020-08-30|
|2020-08-29|
|2020-08-28|
|2020-08-27|
|2020-08-26|
|2020-08-25|
|2020-08-24|
|2020-08-23|
|2020-08-22|
|2020-08-21|
|2020-08-20|
|2020-08-19|
+----------+
only showing top 20 rows



In [5]:
daily_trx = "s3://formation-samba-us-west-2-seabright-samba-sources/transaction/v1/*/*.csv"
daily_trx = spark.read.csv(daily_trx,header=True,sep=",")
daily_trx = daily_trx.withColumn('filename',f.input_file_name())

In [39]:
daily_trx = daily_trx.withColumn('data_processamento',f.to_timestamp('data_processamento').cast('date'))\
            .withColumn('data_transacao',f.to_timestamp('data_transacao').cast('date'))\
            .withColumn('data_voo',f.to_timestamp('data_voo'))\
            .withColumn('qtd_bilhetes',f.coalesce(f.col('qtd_bilhetes').cast('int'),f.lit(0)))\
            .withColumn('teto_diamante',f.coalesce(f.col('teto_diamante').cast('int'),f.lit(0)))\
            .withColumn('viaje_facil',f.coalesce(f.col('viaje_facil').cast('int'),f.lit(0)))\
            .withColumn('voucher',f.coalesce(f.col('voucher').cast('int'),f.lit(0)))\
            .withColumn('assento',f.coalesce(f.col('assento').cast('int'),f.lit(0)))\
            .withColumn('bagagem',f.coalesce(f.col('bagagem').cast('int'),f.lit(0)))\
            .withColumn('parte_milhas',f.coalesce(f.col('parte_milhas').cast('long'),f.lit(0)))\
            .withColumn('parte_money',f.coalesce(f.col('parte_money').cast('long'),f.lit(0)))\
            .withColumn('valor_money',f.coalesce(f.col('valor_money').cast('long'),f.lit(0)))\
            .withColumn('volume_milhas',f.coalesce(f.col('volume_milhas').cast('long'),f.lit(0)))\
            .withColumn('tp_transacao',f.when(f.col('tp_transacao')=='REDEPMTION AIRLINE','REDEMPTION AIRLINE')
                                    .otherwise(f.col('tp_transacao')))\
            .withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))

In [34]:
hist_trx = hist_trx\
            .withColumn('data_processamento',f.to_timestamp('data_processamento').cast('date'))\
            .withColumn('data_transacao',f.to_timestamp('data_transacao').cast('date'))\
            .withColumn('data_voo',f.to_timestamp('data_voo'))\
            .withColumn('qtd_bilhetes',f.coalesce(f.col('qtd_bilhetes').cast('int'),f.lit(0)))\
            .withColumn('teto_diamante',f.coalesce(f.col('teto_diamante').cast('int'),f.lit(0)))\
            .withColumn('viaje_facil',f.coalesce(f.col('viaje_facil').cast('int'),f.lit(0)))\
            .withColumn('voucher',f.coalesce(f.col('voucher').cast('int'),f.lit(0)))\
            .withColumn('assento',f.coalesce(f.col('assento').cast('int'),f.lit(0)))\
            .withColumn('bagagem',f.coalesce(f.col('bagagem').cast('int'),f.lit(0)))\
            .withColumn('parte_milhas',f.coalesce(f.col('parte_milhas').cast('long'),f.lit(0)))\
            .withColumn('parte_money',f.coalesce(f.col('parte_money').cast('long'),f.lit(0)))\
            .withColumn('valor_money',f.coalesce(f.col('valor_money').cast('long'),f.lit(0)))\
            .withColumn('volume_milhas',f.coalesce(f.col('volume_milhas').cast('long'),f.lit(0)))\
            .withColumn('tp_transacao',f.when(f.col('tp_transacao')=='REDEPMTION AIRLINE','REDEMPTION AIRLINE')
                                    .otherwise(f.col('tp_transacao')))

In [None]:
hist_trx.select('tp_transacao').distinct().show(100,False)

In [35]:
hist_trx.select('data_transacao').groupby('data_transacao').count().sort(f.col('data_transacao').desc()).take(10)

[Row(data_transacao=datetime.date(2020, 9, 7), count=21285),
 Row(data_transacao=datetime.date(2020, 9, 6), count=19808),
 Row(data_transacao=datetime.date(2020, 9, 5), count=220326),
 Row(data_transacao=datetime.date(2020, 9, 4), count=47821),
 Row(data_transacao=datetime.date(2020, 9, 3), count=41586),
 Row(data_transacao=datetime.date(2020, 9, 2), count=54635),
 Row(data_transacao=datetime.date(2020, 9, 1), count=37564),
 Row(data_transacao=datetime.date(2020, 8, 31), count=39645),
 Row(data_transacao=datetime.date(2020, 8, 30), count=22369),
 Row(data_transacao=datetime.date(2020, 8, 29), count=17593)]

In [56]:
cus_trans = daily_trx.filter(f.col('data_processamento') > f.lit('2020-09-08'))\
.union(hist_trx.filter(f.col('data_processamento') != f.lit('2029-07-10')))

In [41]:
spark.sql('''drop table if exists clean.transactions''')

DataFrame[]

In [44]:
 query='''  CREATE TABLE IF NOT EXISTS clean.transactions (
   id_customer string ,
    clube string ,
    clube_plano string ,
    tier string ,
    alias_name string ,
    data_processamento date ,
    data_transacao date ,
    safra string ,
    id_transacao string ,
    nome_parceiro string ,
    tipo_parceiro string ,
    tipo_produto string ,
    tipo_milhas string ,
    data_voo timestamp ,
    qtd_bilhetes integer ,
    nr_voo string ,
    pnr string ,
    od string ,
    aeroporto_origem string ,
    continente_origem string ,
    aeroporto_destino string ,
    continente_destino string ,
    tipo_viagem string ,
    teto_diamante integer ,
    viaje_facil integer ,
    voucher integer ,
    assento integer ,
    bagagem integer ,
    classe string ,
    tp_tarifa string ,
    parte_milhas long ,
    parte_money long ,
    valor_money long ,
    volume_milhas long ,
    canal_operacao string ,
    tp_transacao string )
    PARTITIONED BY (load_date DATE)
    STORED AS PARQUET
    LOCATION 's3://formation-samba-us-west-2-seabright-samba-derived/clean/transactions'
    '''
spark.sql(query)

DataFrame[]

In [48]:
daily_trx.filter(f.col('data_processamento') > f.lit('2020-09-08'))\
.drop('filename')\
.write.format("parquet")\
.mode('append')\
.insertInto("clean.transactions")


In [25]:
# get customer transactions

query = """ select max(load_date) from clean.transactions """

max_load_date = str(spark.sql(query).take(1)[0][0])
print(max_load_date)


cmd = "aws s3 ls --recursive  s3://formation-samba-us-west-2-seabright-samba-sources/transaction/v1/"
wow = subprocess.check_output(cmd.split())
bsn = 's3://formation-samba-us-west-2-seabright-samba-sources/'
daily_trx = [bsn+a.decode('utf-8') for a in wow.split() if a.decode('utf-8').endswith('csv')]
dates = [a.split('v1/')[1].split('/')[0] for a in daily_trx]


ix = dates.index(max_load_date)
trx_append = spark.read.csv(daily_trx[ix+1:],header=True)
trx_append = trx_append.withColumn('filename',f.input_file_name())\
            .withColumn('data_processamento',f.to_timestamp('data_processamento').cast('date'))\
            .withColumn('data_transacao',f.to_timestamp('data_transacao').cast('date'))\
            .withColumn('data_voo',f.to_timestamp('data_voo'))\
            .withColumn('qtd_bilhetes',f.coalesce(f.col('qtd_bilhetes').cast('int'),f.lit(0)))\
            .withColumn('teto_diamante',f.coalesce(f.col('teto_diamante').cast('int'),f.lit(0)))\
            .withColumn('viaje_facil',f.coalesce(f.col('viaje_facil').cast('int'),f.lit(0)))\
            .withColumn('voucher',f.coalesce(f.col('voucher').cast('int'),f.lit(0)))\
            .withColumn('assento',f.coalesce(f.col('assento').cast('int'),f.lit(0)))\
            .withColumn('bagagem',f.coalesce(f.col('bagagem').cast('int'),f.lit(0)))\
            .withColumn('parte_milhas',f.coalesce(f.col('parte_milhas').cast('long'),f.lit(0)))\
            .withColumn('parte_money',f.coalesce(f.col('parte_money').cast('long'),f.lit(0)))\
            .withColumn('valor_money',f.coalesce(f.col('valor_money').cast('long'),f.lit(0)))\
            .withColumn('volume_milhas',f.coalesce(f.col('volume_milhas').cast('long'),f.lit(0)))\
            .withColumn('tp_transacao',f.when(f.col('tp_transacao')=='REDEPMTION AIRLINE','REDEMPTION AIRLINE')
                                    .otherwise(f.col('tp_transacao')))\
            .withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))

trx_append.groupby().agg(f.max('data_transacao'),f.min('data_transacao'),f.min('load_date'),f.max('load_date')).show()

In [30]:
trx_append.drop('filename')\
.write.format("parquet")\
.mode('append')\
.insertInto("clean.transactions")

In [33]:
spark.sql('''show partitions clean.transactions''').show(20,False)

+--------------------+
|partition           |
+--------------------+
|load_date=2020-09-08|
|load_date=2020-09-09|
|load_date=2020-09-10|
|load_date=2020-09-11|
|load_date=2020-09-12|
|load_date=2020-09-13|
|load_date=2020-09-14|
|load_date=2020-09-15|
|load_date=2020-09-16|
|load_date=2020-09-17|
|load_date=2020-09-18|
|load_date=2020-09-19|
|load_date=2020-09-20|
|load_date=2020-09-21|
|load_date=2020-09-22|
+--------------------+



In [9]:
# path ="s3://formation-samba-us-west-2-seabright-samba-sources/demographic/v1/2020-08-28/Base_Demografica_Formation_20200828.csv"
demos_path = "s3://formation-samba-us-west-2-seabright-samba-sources/demographic/v1/*/*.CSV"
cus_demos = spark.read.csv(demos_path,header=True,sep=",")
cus_demos = cus_demos.filter(f.col('id_customer') != '411801aa193970bae7a6f641b6cc115b')\
                     .withColumn('load_date',f.split(f.input_file_name(),'/').getItem(5).cast('date'))\
                    .withColumn('filename',f.input_file_name())
                    

In [11]:
cus_demos.select('filename').distinct().show(10,False)

+--------------------------------------------------------------------------------------------------------------------------------+
|filename                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
|s3://formation-samba-us-west-2-seabright-samba-sources/demographic/v1/2020-04-01/Base_Demografica_Formation_20200402.CSV        |
|s3://formation-samba-us-west-2-seabright-samba-sources/demographic/v1/2020-06-23/Base_Demografica_Formation_202000624.CSV       |
|s3://formation-samba-us-west-2-seabright-samba-sources/demographic/v1/load_dt=2020-09-17/Base_Demografica_Formation_20200917.CSV|
|s3://formation-samba-us-west-2-seabright-samba-sources/demographic/v1/2020-06-10/Base_Demografica_Formation_20200611.CSV        |
+----------------------------------------------------------------------------------

In [23]:
path ="s3://formation-samba-us-west-2-seabright-samba-sources/demographic/v1/load_dt=2020-09-17/Base_Demografica_Formation_20200917.CSV"
cus_demos = spark.read.csv(path,header=True)
hmm = cus_demos.withColumn('load_date',f.split(f.split(f.input_file_name(),'=').getItem(1),'/').getItem(0).cast('date'))

In [50]:
hmm.count()

17729600

In [84]:
 query='''  CREATE TABLE IF NOT EXISTS clean.customer_profile (
       id_customer string ,
     ind_ativo int ,
     ind_cobranded int ,
     clube_ativo string ,
     plano string ,
     tipo_plano string ,
     data_adesao date ,
     app_ativo int ,
     dt_instalacao date ,
     login_app string ,
     dt_login_app string ,
     login_site int ,
     dt_login_site timestamp ,
     preferencia int ,
     dt_preenchimento date ,
     cf_pai string ,
     created_cf_pai string ,
     cf_filho string ,
     created_cf_filho string ,
     estado_civil string ,
     nacionalidade string ,
     data_cadastro date ,
     data_nascimento date ,
     canal_cadastro string ,
     cidade string ,
     pais string ,
     uf string ,
     tier string ,
     sexo string ,
     saldo int ,
     tempo_cadastro int ,
     idade int ,
     regiao string ,
     grupo_engajamento string ,
     renda_final double ,
     rentabilidade double ,
     grupo_rentabilidade string ,
     open7_day string ,
     open15_day string ,
     open30_day string ,
     flag_cell_phone int ,
     flag_email integer ,
     milheiros int ,
     elegiveis int,
     offer_credit_card string,
     offer_radar_smiles string,
     offer_buy_miles string )
    PARTITIONED BY (load_date DATE)
    STORED AS PARQUET
    LOCATION 's3://formation-samba-us-west-2-seabright-samba-derived/clean/customer_profile'
    '''
spark.sql(query)

DataFrame[]

In [59]:
query = '''CREATE DATABASE clean
location  's3://formation-samba-us-west-2-seabright-samba-derived/clean' 
'''
spark.sql(query)

DataFrame[]

In [51]:
hmm.write.format("parquet")\
.mode('append')\
.insertInto("clean.customer_profile")


## Load updated digital actions directly from source

In [98]:
path ="s3://formation-samba-us-west-2-seabright-samba-sources/historical/Full_Table-2020-03-25/Base_Dig_Onboarding_Formation_FULL.csv"
hist_dig_trx = spark.read.csv(path,header=True)

In [97]:
daily_dig_trx = "s3://formation-samba-us-west-2-seabright-samba-sources/digital-action/v1/*/*.csv"
daily_dig_trx = spark.read.csv(daily_dig_trx,header=True,sep=",")

['s3://formation-samba-us-west-2-seabright-samba-sources/digital-action/v1/2020-09-14/Base_Dig_Onboarding_Formation.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/digital-action/v1/2020-09-15/Base_Dig_Onboarding_Formation.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/digital-action/v1/2020-09-16/Base_Dig_Onboarding_Formation.csv']


In [116]:
daily_dig_trx = daily_dig_trx.withColumn('update_date',f.to_timestamp('update_date').cast('date'))
hist_dig_trx = hist_dig_trx.withColumn('update_date',f.to_timestamp('update_date').cast('date'))
hist_dig_trx = hist_dig_trx.filter(f.col('update_date')<f.lit('2020-03-23'))\
              .withColumn('load_date',f.split(f.split(f.input_file_name(),"/").getItem(4),"Full_Table-").getItem(1).cast('date'))

daily_dig_trx = daily_dig_trx.withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))

In [100]:
hist_dig_trx.select('update_date').distinct().sort(f.col('update_date').desc()).take(4)

[Row(update_date=datetime.date(2020, 3, 24)),
 Row(update_date=datetime.date(2020, 3, 23)),
 Row(update_date=datetime.date(2020, 3, 22)),
 Row(update_date=datetime.date(2020, 3, 21))]

In [102]:
daily_dig_trx.select('update_date').distinct().sort(f.col('update_date')).take(4)

[Row(update_date=datetime.date(2020, 3, 24)),
 Row(update_date=datetime.date(2020, 3, 25)),
 Row(update_date=datetime.date(2020, 3, 26)),
 Row(update_date=datetime.date(2020, 3, 27))]

In [117]:
dig_trx = daily_dig_trx.union(hist_dig_trx).filter(f.col('id_customer')!='')
dig_trx = dig_trx.withColumn('action_flag',f.regexp_replace(f.lower(f.col('action_flag'))," ","_"))\
                .withColumn('action_flag',f.when(f.col('action_flag') == 'filling_in_preferences',f.lit('fill_in_preferences'))
                            .otherwise(f.col('action_flag')))

In [118]:
dig_trx.select('action_flag').distinct().show(100,False)

+-------------------+
|action_flag        |
+-------------------+
|login_app          |
|update_profile     |
|fill_in_preferences|
|download_app       |
|login_site         |
|radar_smiles       |
+-------------------+



In [44]:
# get customer transactions

query = """ select max(load_date) from clean.digital_actions """

max_load_date = str(spark.sql(query).take(1)[0][0])
print(max_load_date)


cmd = "aws s3 ls --recursive  s3://formation-samba-us-west-2-seabright-samba-sources/digital-action/v1/"
wow = subprocess.check_output(cmd.split())
bsn = 's3://formation-samba-us-west-2-seabright-samba-sources/'
daily_trx = [bsn+a.decode('utf-8') for a in wow.split() if a.decode('utf-8').endswith('csv')]
dates = [a.split('v1/')[1].split('/')[0] for a in daily_trx]


ix = dates.index(max_load_date)
trx_append = spark.read.csv(daily_trx[ix+1:],header=True)
trx_append = trx_append\
                    .withColumn('update_date',f.to_timestamp('update_date').cast('date'))\
                    .withColumn('filename',f.input_file_name())\
                     .withColumn('action_flag',f.regexp_replace(f.lower(f.col('action_flag'))," ","_"))\
                    .withColumn('action_flag',f.when(f.col('action_flag') == 'filling_in_preferences',f.lit('fill_in_preferences')))\
                 .withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))        


2020-09-16


In [46]:
trx_append.groupby().agg(f.max('update_date'),f.min('update_date'),f.min('load_date'),f.max('load_date')).show()

+----------------+----------------+--------------+--------------+
|max(update_date)|min(update_date)|min(load_date)|max(load_date)|
+----------------+----------------+--------------+--------------+
|      2020-09-23|      2020-09-16|    2020-09-17|    2020-09-22|
+----------------+----------------+--------------+--------------+



In [51]:
w = Window.orderBy("update_date")

In [53]:
trx_append.withColumn('lagy', f.lag(f.col('update_date').over(w)))

AnalysisException: "Expression 'update_date#2190' not supported within a window function.;;\nProject [id_customer#2184, update_date#2190, action_flag#2204, filename#2194, load_date#2209, lagy#2269]\n+- Project [id_customer#2184, update_date#2190, action_flag#2204, filename#2194, load_date#2209, _w0#2270, lag(_w0#2270, 1, null) AS lagy#2269]\n   +- Project [id_customer#2184, update_date#2190, action_flag#2204, filename#2194, load_date#2209, _w0#2270]\n      +- Project [id_customer#2184, update_date#2190, action_flag#2204, filename#2194, load_date#2209, _w0#2270, _w0#2270]\n         +- Window [update_date#2190 windowspecdefinition(update_date#2190 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _w0#2270], [update_date#2190 ASC NULLS FIRST]\n            +- Project [id_customer#2184, update_date#2190, action_flag#2204, filename#2194, load_date#2209]\n               +- Project [id_customer#2184, update_date#2190, action_flag#2204, filename#2194, cast(split(input_file_name(), /)[5] as date) AS load_date#2209]\n                  +- Project [id_customer#2184, update_date#2190, CASE WHEN (action_flag#2199 = filling_in_preferences) THEN fill_in_preferences END AS action_flag#2204, filename#2194]\n                     +- Project [id_customer#2184, update_date#2190, regexp_replace(lower(action_flag#2186),  , _) AS action_flag#2199, filename#2194]\n                        +- Project [id_customer#2184, update_date#2190, action_flag#2186, input_file_name() AS filename#2194]\n                           +- Project [id_customer#2184, cast(to_timestamp('update_date, None) as date) AS update_date#2190, action_flag#2186]\n                              +- Relation[id_customer#2184,update_date#2185,action_flag#2186] csv\n"

In [None]:
query = '''CREATE EXTERNAL TABLE `digital_actions`(
  id_customer string, 
  update_date date, 
  action_flag string)
PARTITIONED BY ( 
  load_date date)
  STORED AS PARQUET
 LOCATION 's3://formation-samba-us-west-2-seabright-samba-derived/clean/customer_profile' '''

spark.sql(query)

In [119]:
dig_trx.write.format("parquet")\
.mode('overwrite')\
.partitionBy('load_date')\
.option("path", 's3://formation-samba-us-west-2-seabright-samba-derived/clean/digital_actions').saveAsTable("clean.digital_actions")


In [132]:
daily_cross_path = "s3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/*/*.csv"
daily_cross = spark.read.csv(daily_cross_path,header=True,sep=",")
daily_cross = daily_cross.withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))\
            .withColumn('event_date',f.to_timestamp('event_date'))

['s3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-08-29/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-08-30/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-08-31/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-09-01/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-09-02/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-09-03/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-09-04/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-09-05/Base_Cross_Sell_New_Offers.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/cross_sell/v1/2020-09-06/Base_C

In [134]:
daily_cross.write.format("parquet")\
.partitionBy('load_date')\
.option("path", 's3://formation-samba-us-west-2-seabright-samba-derived/clean/cross_sell').saveAsTable("clean.cross_sell")

In [None]:
path ="s3://formation-samba-us-west-2-seabright-samba-sources/historical/2020-09-10/Base_Flight_Search_historical.csv"
hist_flight = spark.read.csv(path,header=True)
hist_flight = hist_flight\
.withColumn('load_date',f.split(f.input_file_name(),"/").getItem(4).cast('date'))\
 .withColumn('search_date',f.col('search_date').cast('date'))\
            .withColumn('departure_date',f.col('departure_date').cast('date'))\
            .withColumn('adults',f.col('adults').cast('int'))\
            .withColumn('children',f.col('children').cast('int'))\
                          .withColumn('infants',f.col('infants').cast('int'))

In [None]:
hist_flight.select('search_date').distinct().sort(f.col('search_date').desc()).take(4)

In [141]:
cmd = "aws s3 ls --recursive  s3://formation-samba-us-west-2-seabright-samba-sources/flight_search/"
wow = subprocess.check_output(cmd.split())
bsn = 's3://formation-samba-us-west-2-seabright-samba-sources/'
daily_fl_path = [bsn+a.decode('utf-8') for a in wow.split() if a.decode('utf-8').endswith('csv')][4:]
print(daily_fl_path[-3:])
flight_search = spark.read.csv(daily_fl_path,header=True,sep=",")
flight_search = flight_search.withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))\
            .withColumn('search_date',f.col('search_date').cast('date'))\
            .withColumn('departure_date',f.col('departure_date').cast('date'))\
            .withColumn('adults',f.col('adults').cast('int'))\
.withColumn('children',f.col('children').cast('int'))\
.withColumn('infants',f.col('infants').cast('int'))\
.filter(f.col('search_date')>f.lit('2020-09-08'))

['s3://formation-samba-us-west-2-seabright-samba-sources/flight_search/v1/2020-09-12/Base_Flight_Search.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/flight_search/v1/2020-09-13/Base_Flight_Search.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/flight_search/v1/2020-09-14/Base_Flight_Search.csv']


In [159]:
flights = flight_search.union(hist_flight).filter(f.col('id_customer')!='')

In [161]:
flights.printSchema()

root
 |-- id_customer: string (nullable = true)
 |-- search_date: date (nullable = true)
 |-- channel: string (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: integer (nullable = true)
 |-- infants: integer (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- destination_airport: string (nullable = true)
 |-- origin_airport2: string (nullable = true)
 |-- destination_airport2: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- load_date: date (nullable = true)



In [162]:
flights.write.format("parquet")\
.partitionBy('load_date')\
.option("path", 's3://formation-samba-us-west-2-seabright-samba-derived/clean/flight_search').saveAsTable("clean.flight_search")

In [171]:
path ="s3://formation-samba-us-west-2-seabright-samba-sources/historical/Full_Table-2020-03-25/Base_Clube_Conta_Familia_Formation_FULL.CSV"
hist_club = spark.read.csv(path,header=True)
hist_club = hist_club.withColumn('dt_solicitacao',f.to_timestamp('dt_solicitacao'))\
                        .withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))
                       

hist_club.select('dt_solicitacao').sort(f.col('dt_solicitacao').desc()).take(2)

In [176]:
daily_club_path = "s3://formation-samba-us-west-2-seabright-samba-sources/club-membership/v1/*/*.[csv|CSV]"
daily_club = spark.read.csv(daily_club_path,header=True,sep=",")
daily_club = daily_club.withColumn('dt_solicitacao',f.to_timestamp('dt_solicitacao'))\
                        .withColumn('load_date',f.split(f.input_file_name(),"/").getItem(5).cast('date'))\
                        .filter(f.col('dt_solicitacao') > f.lit('2020-03-24'))
                        

['s3://formation-samba-us-west-2-seabright-samba-sources/club-membership/v1/2020-09-14/Base_Clube_Conta_Familia_Formation.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/club-membership/v1/2020-09-15/Base_Clube_Conta_Familia_Formation.csv', 's3://formation-samba-us-west-2-seabright-samba-sources/club-membership/v1/2020-09-16/Base_Clube_Conta_Familia_Formation.csv']


In [177]:
club = daily_club.union(hist_club).filter(f.col('id_customer')!='')

In [178]:
club.write.format("parquet")\
.partitionBy('load_date')\
.option("path", 's3://formation-samba-us-west-2-seabright-samba-derived/clean/club_membership')\
.saveAsTable("clean.club_membership")