In [1]:
from pyspark.sql import SparkSession, DataFrame
from datetime import datetime, timedelta
from pyspark.sql.types import *
from pyspark.sql import functions as F
from google.cloud import storage
from functools import reduce
import re

In [2]:
CURRENT_DATE_ARG = None

In [3]:
ORIGINS = ["bot_social", "bot_ativo", "bot_crm", "bot_midia", "bot_alpha", "bot_comparador", "bot_crm_2", "bot_social_2"]
BUCKET_NAME = "tim-ultrafibra-gcs-sp"
CURRENT_DATE = datetime.strptime(CURRENT_DATE_ARG, '%Y-%m-%dT%H:%M:%S') if CURRENT_DATE_ARG is not None else datetime.today() 
ORIGIN_ID = {
  "bot_social": "08009425040",
  "bot_ativo": "8001262276",
  "bot_crm": "08009419620",
  "bot_midia": "8008002003",
  "bot_alpha": "8009419679",
  "bot_comparador": "8009419612",
  "bot_crm_2": "08009419578",
  "bot_social_2": "08009425111"
}

In [4]:
BRAZILIAN_TIMEDIFF = timedelta(hours=3)
CURRENT_DATE = CURRENT_DATE - BRAZILIAN_TIMEDIFF

In [5]:
def is_midnight_hour(hour):
  return hour >= 0 and hour <= 4

if is_midnight_hour(CURRENT_DATE.hour):
  CURRENT_DATE = CURRENT_DATE - timedelta(days=1)
  CURRENT_DATE = CURRENT_DATE.replace(hour=23, minute=59, second=59)

In [6]:
spark = SparkSession.builder.appName("tim_ultrafibra_raw_to_curated")\
  .config("spark.sql.caseSensitive", "True")\
  .config("spark.sql.session.timeZone", "America/Sao_Paulo")\
  .getOrCreate()

gcsClient = storage.Client()

In [7]:
def to_snake_case(input_string):
  input_string = input_string.replace(" ", "_").replace("-", "_")
  snake_case_string = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', input_string)
  return snake_case_string.lower()

def sanitize_column_name(text):
  text = to_snake_case(text).replace("/", "_")
  text = re.sub(r"_\d+(\.\d?)*_", "_", text, count=1)
  return text

def create_column_name(df: DataFrame, list_columns: list) -> DataFrame:
  for col_name in list_columns:
    if col_name not in df.columns:
      df = df.withColumn(col_name, F.lit(None).cast(StringType()))
  return df

def rename_columns(df: DataFrame, list_step_not_identified: list) -> DataFrame:
  step_pattern = re.compile(r"^\d.*\.\d?\s")
  for col_name in df.columns:
    if step_pattern.match(col_name) or col_name in list_step_not_identified:
      if "Etapa" not in col_name:
        new_column_name = sanitize_column_name(f"etapa_{col_name}")
      else:
        new_column_name = sanitize_column_name(col_name)
      if new_column_name in df.columns:
        df = df.withColumn(new_column_name, F.col(new_column_name) | F.when(F.col(f"`{col_name}`") == "1", True).otherwise(False))\
          .drop(col_name)
      else:
        df = df.withColumn(new_column_name, F.when(F.col(f"`{col_name}`") == "1", True).otherwise(False))\
          .drop(col_name)
    else:
      df = df.withColumnRenamed(col_name, sanitize_column_name(col_name))
  return df

In [8]:
def get_path_in_raw_zone(datetime: datetime, origin: str):
  return f"gs://{BUCKET_NAME}/raw-zone/{origin}/{datetime.strftime('%Y%m')}/{datetime.strftime('%Y%m%d-%H%M%S')}.parquet"

def get_path_in_curated_zone(datetime: datetime, origin: str):
  return f"gs://{BUCKET_NAME}/curated-zone/{origin}/{datetime.strftime('%Y%m')}/{datetime.strftime('%Y%m%d')}.parquet"

def get_prefix_in_raw_zone(datetime: datetime, origin: str):
  return f"raw-zone/{origin}/{datetime.strftime('%Y%m')}/{datetime.strftime('%Y%m%d')}"

def unify_columns(df: DataFrame, columns: list):
  existing_columns = [col for col in columns if col in df.columns]
  existing_columns.sort(key=lambda col: df.where(F.col(col).isNotNull()).count(), reverse=True)
  if len(existing_columns) == 0:
    return F.lit(None).cast(StringType())
  return F.coalesce(*existing_columns)

def convert_empty_string_to_null(df: DataFrame, columns: list):
  supported_types = [StringType(), IntegerType(), DoubleType(), FloatType(), LongType(), ShortType(), ByteType(), BooleanType(), DecimalType()]
  for column_name in columns:
    column_type = df.schema[column_name].dataType
    if column_type not in supported_types:
      column_type = StringType()
    df = df.withColumn(column_name, F.when(F.col(column_name) == "null", F.lit(None).cast(column_type)).otherwise(F.col(column_name)))
  return df

def reduce_dataframe(dfs: list):
  if len(dfs) == 0:
    return spark.createDataFrame([], StringType())
  elif len(dfs) == 1:
    return dfs[0]
  else:
    return reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)

In [9]:
bucket = gcsClient.get_bucket(BUCKET_NAME)

blobs_per_origin = {}

for origin in ORIGINS:
  blobs = bucket.list_blobs(prefix=get_prefix_in_raw_zone(CURRENT_DATE, origin))
  blobs_per_origin[origin] = [f"gs://{BUCKET_NAME}/{blob.name}" for blob in blobs if blob.name.endswith(".parquet/")]

In [10]:
columns_to_drop = [
  'apiwci',
  'Chat Messages From Origin',
  'Combinacao',
  'OptInMensagemAtiva',
  'responseAtualiza',
  'responseDadosPessoais',
  'responseEndereco',
  'responseFatura',
  'responseSV',
  'responseVia',
  'responseendereco1',
  'responseviabilidade',
  'statusAtualiza',
  'statusDadosPessoais',
  'statusEndereco',
  'statusFatura',
  'statusSV',
  'statusVia',
  'statusendereco1',
  'statusviabilidade',
  'dataAgendamento1',
  'dataAgendamento2',
  'lastMessageDate',
  'Primeira Mensagem',
  'Primeira mensagem',
  'primeiraMensagem',
  'Primeira_mensagem',
  'responseatualiza',
  'SourceId',
  'SourceTitle',
  'SourceType',
  'SourceUrl',
  'utmMedium',
  'UtmMedium',
  'utmSource',
  'UtmSource',
  'utmCampaign',
  'UtmCampaign',
  'utmCampaign2'
]

In [11]:
list_date_cols = ['dataAgendamento1', 'dataAgendamento2', 'dataAgendamento3', 'data1', 'data2', 'data3']

In [12]:
columns_create_step = ["data_agendamento_1", "data_agendamento_2", "data_agendamento_3", "Retomada"]

In [13]:
dataframes_per_origin = {}

for origin in ORIGINS:
  dataframe_list = []
  for blob_path in blobs_per_origin[origin]:
    df = spark.read.parquet(blob_path)

    df = create_column_name(df, list_date_cols)

    df = df.withColumn('utm_campaign', unify_columns(df, ['utmCampaign', 'UtmCampaign', 'utmCampaign2']))
    df = df.withColumn('utm_medium', unify_columns(df, ['utmMedium', 'UtmMedium']))
    df = df.withColumn('tipo', F.col('utmMedium') if "utmMedium" in df.columns else F.lit(None).cast(StringType()))
    df = df.withColumn('utm_source', unify_columns(df, ['utmSource', 'UtmSource']))
    df = df.withColumn('source_id', unify_columns(df, ['source_id', 'SourceId']))
    df = df.withColumn('source_type', unify_columns(df, ['source_type', 'SourceType']))
    df = df.withColumn('source_url', unify_columns(df, ['source_url', 'SourceUrl']))
    df = df.withColumn('data_agendamento_1', F.when(F.trim(F.col('data1')).isNull() | (F.trim(F.col('data1')) == ""), F.col('dataAgendamento1')).otherwise(F.col('data1')))
    df = df.withColumn('data_agendamento_2', F.when(F.trim(F.col('data2')).isNull() | (F.trim(F.col('data2')) == ""), F.col('dataAgendamento2')).otherwise(F.col('data2')))
    df = df.withColumn('data_agendamento_3', F.when(F.trim(F.col('data3')).isNull() | (F.trim(F.col('data3')) == ""), F.col('dataAgendamento3')).otherwise(F.col('data3')))
    df = df.withColumn('primeira_mensagem1', unify_columns(df, ['Primeira Mensagem', 'Primeira mensagem', 'primeiraMensagem', 'Primeira_mensagem']))
    df = df.withColumn('num_bot', F.lit(ORIGIN_ID[origin]))\
      .withColumnRenamed("SourceTitle", "source_title")\
      .withColumnRenamed("planoDescription", "plan_description")\
      .withColumnRenamed("lastMessageDate", "last_message_date")\
      .withColumnRenamed("city", "cidade")\
      .drop(*columns_to_drop)
    df = df.withColumnRenamed("primeira_mensagem1", "primeira_mensagem")

    dataframe_list.append(df)

  final_dataframe = reduce_dataframe(dataframe_list)
  final_dataframe = rename_columns(final_dataframe, columns_create_step)

  dataframes_per_origin[origin] = final_dataframe

In [14]:
step_columns = {
  'etapa_cep': 1,
  'etapa_cep_erro': 2,
  'etapa_cep_invalido': 3,
  'etapa_cep_invalido_erro': 4,
  'etapa_num_casa': 5,
  'etapa_num_casa_erro': 6,
  'etapa_num_casa_invalido': 7,
  'etapa_num_casa_invalido_erro': 8,
  'etapa_exibe_planos': 9,
  'etapa_exibe_planos_erro': 10,
  'etapa_saber_mais': 11,
  'etapa_email': 12,
  'etapa_email_erro': 13,
  'etapa_email_invalido': 14,
  'etapa_email_invalido_erro': 15,
  'etapa_cpf': 16,
  'etapa_cpf_erro': 17,
  'etapa_cpf_invalido': 18,
  'etapa_cpf_invalido_erro': 19,
  'etapa_nome': 20,
  'etapa_nome_mae': 21,
  'etapa_data_nascimento': 22,
  'etapa_logradouro': 23,
  'etapa_bairro': 24,
  'etapa_complemento': 25,
  'etapa_confirma_endereco': 26,
  'etapa_confirma_endereco_erro': 27,
  'etapa_altera_endereco': 28,
  'etapa_altera_endereco_sv': 29,
  'etapa_vencimento': 30,
  'etapa_vencimento_erro': 31,
  'etapa_vencimento_invalido': 32,
  'etapa_tipo_fatura': 33,
  'etapa_tipo_fatura_erro': 34,
  'etapa_tipo_fatura_invalido': 35,
  'etapa_tipo_fatura_invalido_erro': 36,
  'etapa_banco': 37,
  'etapa_banco_erro': 38,
  'etapa_banco_invalido': 39,
  'etapa_banco_invalido_erro': 40,
  'etapa_agencia': 41,
  'etapa_agencia_erro': 42,
  'etapa_agencia_invalida': 43,
  'etapa_agencia_invalida_erro': 44,
  'etapa_conta': 45,
  'etapa_conta_erro': 46,
  'etapa_conta_invalida': 47,
  'etapa_data_de_agendamento_1': 48,
  'etapa_erro': 49,
  'etapa_invalida': 50,
  'etapa_invalida_erro': 51,
  'etapa_data_de_agendamento_2': 52,
  'etapa_erro': 53,
  'etapa_invalida': 54,
  'etapa_data_de_agendamento_3': 55,
  'etapa_data_de_agendamento_3_erro': 56,
  'etapa_data_de_agendamento_3_invalida': 57,
  'etapa_data_de_agendamento_3_invalida_erro': 58,
  'etapa_opt_in': 59,
  'etapa_opt_in_erro': 60,
  'etapa_finalizacao': 61,
  'etapa_abandono_1': 62,
  'etapa_abandono_2': 63,
  'etapa_abandono_3': 64,
  'etapa_abandono_5': 65,
  'etapa_retomada': 66,
}

step_order = {k: v for k, v in sorted(step_columns.items(), key=lambda item: item[1])}


In [15]:
valid_columns = [
  'application_identifier', 'auto_sku', 'bairro', 'broadcast_list_id',
  'broadcast_origem', 'campaign_id', 'campaign_message_template', 'campaign_originator',
  'canal', 'carrinho_abandonado', 'cep', 'cidade', 'complemento', 'contratoda', 'contratofa', 'cpf',
  'data_agendamento_1', 'data_agendamento_2', 'data_agendamento_3', 'data_nascimento',
  'ddd', 'diferenca_preco', 'email', 'fatura_cliente', 'identificador_pedido', 'identity',
  'last_message_date', 'logradouro', 'name', 'nome_mae', 'num_agencia', 'num_bot',
  'num_conta', 'num_endereco', 'pedido_finalizado_fixa', 'pedido_finalizado_fixa_hora', 'phone_number',
  'plan_description', 'plan_name', 'portabilidade', 'preco_base', 'preco_boleto', 'price',
  'primeira_mensagem', 'produto', 'retomada', 'source', 'source_id',
  'source_type', 'source_url', 'step', 'step_abandono', 'suporte',
  'tag', 'tel_abandono', 'tipo', 'tipo_fatura', 'uf', 'utm_campaign',
  'utm_medium', 'utm_source', 'utm_term', 'vencimento_fatura'
]

In [16]:
dataframes_from_all_origins = []

for origin in ORIGINS:
  if origin in dataframes_per_origin:
    df = dataframes_per_origin[origin]

    non_existing_step_columns = [col for col in step_columns.keys() if col not in df.columns]
    non_step_columns = [col for col in df.columns if col not in step_columns.keys() and col in valid_columns]

    for col in non_existing_step_columns:
      df = df.withColumn(col, F.lit(False))

    final_dataframe = df.select(non_step_columns + list(step_order.keys()))
    dataframes_from_all_origins.append(final_dataframe)
unified_dataframe = reduce_dataframe(dataframes_from_all_origins)
unified_dataframe = convert_empty_string_to_null(unified_dataframe, unified_dataframe.columns)
unified_dataframe.write.mode("overwrite").parquet(get_path_in_curated_zone(CURRENT_DATE, "bot_tim"))