In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Window, types

spark = SparkSession\
    .builder\
    .appName("etl")\
    .getOrCreate()

In [2]:
def cast_columns(df, columns):
    for col_name, (col_type, col_type_params) in columns.items():
        df = df.withColumn(col_name, df[col_name].cast(col_type(**col_type_params)))
    return df

def parse_json_columns(df, columns, json_columns):
    for json_col_name, schema in json_columns.items():
        df = df.withColumn(json_col_name, from_json(json_col_name, schema))\
             .select(
                *[col(col_name) for col_name in columns.keys() if not col_name == json_col_name],
                col(f'{json_col_name}.*')
        )
    return df

def remap_columns(df, columns):
    for col_name, mapping in columns.items():
        df = df.replace(to_replace=mapping, subset=[col_name])
    return df

In [3]:
csv_params = {
    'header': True,
    'inferSchema': True,
    'encoding': 'utf-8',
    'sep': ';',
}

users_df = spark.read.csv('../data/usuarios.csv', **csv_params)

users_columns = {
    'user_id': (types.StringType, {}),
    'dados_pessoais': (types.StringType, {}),
    'nivel_de_risco': (types.IntegerType, {}),
    'objetivo': (types.StringType, {}),
    'perfil_de_risco': (types.StringType, {}),
    'fez_adicional': (types.BooleanType, {}),
    'fez_resgate_parcial': (types.BooleanType, {}),
    'fez_resgate_total': (types.BooleanType, {}),
    'poupanca': (types.DecimalType, {'precision': 10, 'scale': 2}),
    'renda_fixa': (types.DecimalType, {'precision': 10, 'scale': 2}),
    'renda_variavel': (types.DecimalType, {'precision': 10, 'scale': 2}),
}

users_json_columns = {
    'dados_pessoais': types.StructType([
                          types.StructField('genero', types.StringType(), True),
                          types.StructField('estado_civil', types.StringType(), True),
                          types.StructField('idade', types.StringType(), True),
                      ]),
}


users_remap_columns = {
    'estado_civil': {
        'CASADO(A) COM BRASILEIRO(A) NATO(A)': 'CASADO(A)',
        'CASADO(A) COM BRASILEIRO(A) NATURALIZADO(A)': 'CASADO(A)',
        'UNIAO ESTAVEL': 'CASADO(A)',
    }
}

users_df.show()

+-------+--------------------+--------------+-----------------+---------------+-------------+-------------------+-----------------+---------+----------+--------------+
|user_id|      dados_pessoais|nivel_de_risco|         objetivo|perfil_de_risco|fez_adicional|fez_resgate_parcial|fez_resgate_total| poupanca|renda_fixa|renda_variavel|
+-------+--------------------+--------------+-----------------+---------------+-------------+-------------------+-----------------+---------+----------+--------------+
|      1|{"genero": "male"...|             3|     build_wealth|         medium|            1|                  0|                0| 100000.0|      null|          null|
|      2|{"genero": "femal...|             4|     build_wealth|            low|            1|                  1|                0| 190000.0|  190000.0|          null|
|      3|{"genero": "male"...|             5|     build_wealth|         medium|            1|                  0|                0|     null|    1200.0|        

In [4]:
users_df = cast_columns(users_df, users_columns)

users_df.show()

+-------+--------------------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+
|user_id|      dados_pessoais|nivel_de_risco|         objetivo|perfil_de_risco|fez_adicional|fez_resgate_parcial|fez_resgate_total|  poupanca|renda_fixa|renda_variavel|
+-------+--------------------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+
|      1|{"genero": "male"...|             3|     build_wealth|         medium|         true|              false|            false| 100000.00|      null|          null|
|      2|{"genero": "femal...|             4|     build_wealth|            low|         true|               true|            false| 190000.00| 190000.00|          null|
|      3|{"genero": "male"...|             5|     build_wealth|         medium|         true|              false|            false|      null|   1200.00|  

In [5]:
users_df = parse_json_columns(users_df, users_columns, users_json_columns)

users_df.show()

+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+--------------------+-----+
|user_id|nivel_de_risco|         objetivo|perfil_de_risco|fez_adicional|fez_resgate_parcial|fez_resgate_total|  poupanca|renda_fixa|renda_variavel|genero|        estado_civil|idade|
+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+--------------------+-----+
|      1|             3|     build_wealth|         medium|         true|              false|            false| 100000.00|      null|          null|  male|CASADO(A) COM BRA...|   40|
|      2|             4|     build_wealth|            low|         true|               true|            false| 190000.00| 190000.00|          null|female|CASADO(A) COM BRA...|   38|
|      3|             5|     build_wealth|         medium|         true|              fals

In [6]:
new_columns = {
    'idade': (types.IntegerType, {}),
}

users_df = cast_columns(users_df, new_columns)

users_df.show()

+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+--------------------+-----+
|user_id|nivel_de_risco|         objetivo|perfil_de_risco|fez_adicional|fez_resgate_parcial|fez_resgate_total|  poupanca|renda_fixa|renda_variavel|genero|        estado_civil|idade|
+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+--------------------+-----+
|      1|             3|     build_wealth|         medium|         true|              false|            false| 100000.00|      null|          null|  male|CASADO(A) COM BRA...|   40|
|      2|             4|     build_wealth|            low|         true|               true|            false| 190000.00| 190000.00|          null|female|CASADO(A) COM BRA...|   38|
|      3|             5|     build_wealth|         medium|         true|              fals

In [7]:
users_df = remap_columns(users_df, users_remap_columns)

users_df.show()

+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+-------------+-----+
|user_id|nivel_de_risco|         objetivo|perfil_de_risco|fez_adicional|fez_resgate_parcial|fez_resgate_total|  poupanca|renda_fixa|renda_variavel|genero| estado_civil|idade|
+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+-------------+-----+
|      1|             3|     build_wealth|         medium|         true|              false|            false| 100000.00|      null|          null|  male|    CASADO(A)|   40|
|      2|             4|     build_wealth|            low|         true|               true|            false| 190000.00| 190000.00|          null|female|    CASADO(A)|   38|
|      3|             5|     build_wealth|         medium|         true|              false|            false|      null|   1

In [8]:
users_df = users_df.fillna(0, [
    'poupanca',
    'renda_fixa',
    'renda_variavel',
])

users_df = users_df.withColumn(
        'flag_investidor_recorrente',
        users_df.fez_adicional & ~ (users_df.fez_resgate_parcial | users_df.fez_resgate_total)
)

users_df = users_df.withColumn(
        'investimentos_externos',
        col('poupanca') + col('renda_fixa') + col('renda_variavel')
)

users_df.show()

users_df.toPandas().to_csv('../data/refined_zone/users.csv', header=True, sep=';')

+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+-------------+-----+--------------------------+----------------------+
|user_id|nivel_de_risco|         objetivo|perfil_de_risco|fez_adicional|fez_resgate_parcial|fez_resgate_total|  poupanca|renda_fixa|renda_variavel|genero| estado_civil|idade|flag_investidor_recorrente|investimentos_externos|
+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+-------------+-----+--------------------------+----------------------+
|      1|             3|     build_wealth|         medium|         true|              false|            false| 100000.00|      0.00|          0.00|  male|    CASADO(A)|   40|                      true|             100000.00|
|      2|             4|     build_wealth|            low|         true|               true|        

In [93]:
funnel_df = spark.read.csv('../data/funil.csv', **csv_params)

funnel_df.show()

+-------+-------------------+-------------+--------------+
|user_id|          timestamp|       evento|valor_simulado|
+-------+-------------------+-------------+--------------+
|      1|2017-01-06 11:40:25|     Homepage|          null|
|      1|2017-01-29 21:30:56|     Homepage|          null|
|      1|2017-01-29 21:40:04|     Homepage|          null|
|      1|2017-01-29 21:40:04|     Homepage|          null|
|      1|2017-08-01 18:40:27|     Homepage|          null|
|      1|2017-08-03 11:18:19|     Homepage|          null|
|      1|2017-08-03 14:08:16|     Homepage|          null|
|      1|2017-08-08 14:30:37|     Homepage|          null|
|      1|2017-08-15 16:47:01|Questionnaire|        120000|
|      1|2017-08-15 17:27:11|Questionnaire|        120000|
|      1|2017-08-15 17:27:58|Questionnaire|        120000|
|      1|2017-08-15 17:40:30|Questionnaire|        300000|
|      1|2017-08-15 21:15:12|     Homepage|          null|
|      1|2017-08-15 21:29:36|Questionnaire|        30000

In [98]:
funnel_columns = {
    'user_id': (types.IntegerType, {}),
    'timestamp': (types.TimestampType, {}),
    'evento': (types.StringType, {}),
    'valor_simulado': (types.DecimalType, {'precision': 10, 'scale': 2}),
}

funnel_df = cast_columns(funnel_df, funnel_columns)

funnel_df = funnel_df.withColumn(
        'ordem_evento',
        rank().over(Window.partitionBy('user_id').orderBy('timestamp'))
)
funnel_df = funnel_df.withColumn(
        'ordem_inversa_evento',
        rank().over(Window.partitionBy('user_id').orderBy(desc('timestamp')))
)

funnel_df = funnel_df.withColumn(
        'ordem_valor_simulado',
        rank().over(Window.partitionBy('user_id', col('valor_simulado').isNull()).orderBy(desc('timestamp')))
)
funnel_df = funnel_df.withColumn(
        'primeiro_evento',
        col('ordem_evento') == 1
)
funnel_df = funnel_df.withColumn(
        'ultimo_valor_simulado',
        col('valor_simulado').isNotNull() & (col('ordem_valor_simulado') == 1)
)

funnel_df.show()

funnel_df.toPandas().to_csv('../data/refined_zone/funnel.csv', header=True, sep=';')

+-------+-------------------+-------------+--------------+------------+--------------------+--------------------+---------------+---------------------+
|user_id|          timestamp|       evento|valor_simulado|ordem_evento|ordem_inversa_evento|ordem_valor_simulado|primeiro_evento|ultimo_valor_simulado|
+-------+-------------------+-------------+--------------+------------+--------------------+--------------------+---------------+---------------------+
|     31|2018-02-15 23:42:04|Questionnaire|       5000.00|           9|                   1|                   1|          false|                 true|
|     31|2017-08-10 17:07:16|Questionnaire|      50000.00|           8|                   2|                   2|          false|                false|
|     31|2017-08-10 17:06:53|Questionnaire|      50000.00|           7|                   3|                   3|          false|                false|
|     31|2017-08-10 17:06:47|Questionnaire|      50000.00|           6|                 

In [99]:
join = users_df.join(funnel_df.where('primeiro_evento is True').alias('f1'), ['user_id'], how='left')
join = join.join(funnel_df.where('ultimo_valor_simulado is True').alias('f2'), ['user_id'], how='left')

join.show()

+-------+--------------+-----------------+---------------+-------------+-------------------+-----------------+----------+----------+--------------+------+-------------+-----+--------------------------+----------------------+-------------------+-------------+--------------+------------+--------------------+--------------------+---------------+---------------------+-------------------+-------------+--------------+------------+--------------------+--------------------+---------------+---------------------+
|user_id|nivel_de_risco|         objetivo|perfil_de_risco|fez_adicional|fez_resgate_parcial|fez_resgate_total|  poupanca|renda_fixa|renda_variavel|genero| estado_civil|idade|flag_investidor_recorrente|investimentos_externos|          timestamp|       evento|valor_simulado|ordem_evento|ordem_inversa_evento|ordem_valor_simulado|primeiro_evento|ultimo_valor_simulado|          timestamp|       evento|valor_simulado|ordem_evento|ordem_inversa_evento|ordem_valor_simulado|primeiro_evento|ultim

In [100]:
result = join.select(
        col('user_id'), col('genero'), col('estado_civil'), col('idade'), col('nivel_de_risco'),
        col('objetivo'), col('perfil_de_risco'),
        date_format(col('f1.timestamp'), 'yyyy-MM').alias('homepage'),
        col('f2.valor_simulado').alias('valor_simulado'),
        col('flag_investidor_recorrente'), col('investimentos_externos')
).distinct()

result.show()

+-------+------+------------+-----+--------------+-----------------+---------------+--------+--------------+--------------------------+----------------------+
|user_id|genero|estado_civil|idade|nivel_de_risco|         objetivo|perfil_de_risco|homepage|valor_simulado|flag_investidor_recorrente|investimentos_externos|
+-------+------+------------+-----+--------------+-----------------+---------------+--------+--------------+--------------------------+----------------------+
|     70|  male| SOLTEIRO(A)|   39|             1|     build_wealth|            low| 2017-01|    2000000.00|                      true|              22000.00|
|     62|  male|   CASADO(A)|   61|             3|     build_wealth|           high| 2017-05|      15000.00|                     false|              16000.00|
|     35|  male|   CASADO(A)|   42|             4|     build_wealth|           high| 2017-02|      10000.00|                     false|             100000.00|
|     57|female|   CASADO(A)|   39|           

In [103]:
result.toPandas().to_csv('../data/ipynb_result.csv', header=True, sep=';')