In [61]:
from calendar import mdays

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import from_json, col, isnull, when, array_contains, max, last
from pyspark.sql.types import StructType, StructField, StringType


In [4]:
spark = SparkSession.builder \
    .appName("ifood-jsons") \
    .getOrCreate()

In [3]:
spark.sparkContext.setLogLevel("ERROR")

### Tratamento do transactions

In [55]:
file_location = "/Users/alyssonamaral/Documents/ifood-case/case_data/transactions.json"
file_type = "json"

infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df_transactions = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df_transactions = df_transactions \
    .withColumn("amount", col("value.amount")) \
    .withColumn("offer_id_", col("value.`offer id`")) \
    .withColumn("offer_id", col("value.offer_id")) \
    .withColumn("reward", col("value.reward")) \
    .drop("value")

df_transactions.show(5)

                                                                                

+--------------------+--------------+---------------------+------+--------------------+--------+------+
|          account_id|         event|time_since_test_start|amount|           offer_id_|offer_id|reward|
+--------------------+--------------+---------------------+------+--------------------+--------+------+
|78afa995795e4d85b...|offer received|                  0.0|  NULL|9b98b8c7a33c4b65b...|    NULL|  NULL|
|a03223e636434f42a...|offer received|                  0.0|  NULL|0b1e1539f2cc45b7b...|    NULL|  NULL|
|e2127556f4f64592b...|offer received|                  0.0|  NULL|2906b810c7d441179...|    NULL|  NULL|
|8ec6ce2a7e7949b1b...|offer received|                  0.0|  NULL|fafdcd668e3743c1b...|    NULL|  NULL|
|68617ca6246f4fbc8...|offer received|                  0.0|  NULL|4d5c57ea9a6940dd8...|    NULL|  NULL|
+--------------------+--------------+---------------------+------+--------------------+--------+------+
only showing top 5 rows



                                                                                

In [56]:
df_transactions.count()

                                                                                

306534

In [49]:
df_transactions.select("event").distinct().show()


+---------------+
|          event|
+---------------+
|    transaction|
| offer received|
|offer completed|
|   offer viewed|
+---------------+



In [60]:
from pyspark.sql.functions import sum, when, col

# Group by the 'event' column and count the null values in the 'amount' column
null_amount_counts = df_transactions.groupBy("event").agg(
    sum(when(col("amount").isNull(), 1).otherwise(0)).alias("null_amount_count")
)

# Show the resulting DataFrame
null_amount_counts.show()



+---------------+-----------------+
|          event|null_amount_count|
+---------------+-----------------+
|    transaction|                0|
| offer received|            76277|
|offer completed|            33579|
|   offer viewed|            57725|
+---------------+-----------------+



                                                                                

In [62]:
window_spec = Window.partitionBy("account_id").orderBy("time_since_test_start").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_transactions = df_transactions.withColumn(
    "last_transaction_amount",
    last(when(col("event") == "transaction", col("amount")), ignorenulls=True).over(window_spec)
)

df_transactions = df_transactions.withColumn(
    "amount",
    when(col("event") == "transaction", col("amount"))
    .otherwise(col("last_transaction_amount"))
)

In [63]:
null_amount_counts = df_transactions.groupBy("event").agg(
    sum(when(col("amount").isNull(), 1).otherwise(0)).alias("null_amount_count")
)

# Show the resulting DataFrame
null_amount_counts.show()



+---------------+-----------------+
|          event|null_amount_count|
+---------------+-----------------+
|    transaction|                0|
| offer received|            20952|
|offer completed|                0|
|   offer viewed|            14479|
+---------------+-----------------+



                                                                                

In [64]:
df_transactions = df_transactions.filter(~(col("event").isin("offer_received", "offer_completed")))

In [65]:
df_transactions = df_transactions \
    .withColumn("offer_id", when(col("offer_id").isNull(), col("offer_id_")).otherwise(col("offer_id"))) \
    .drop("offer_id_")

In [68]:
df_aggregated = df_transactions.groupBy("account_id").agg(
    last("amount").alias("amount"),
    last("offer_id").alias("offer_id"),
    last("event").alias("last_event")
).withColumn(
    "target",
    when(col("last_event") == "offer completed", 1).otherwise(0)
).select(
    "account_id", "amount", "offer_id", "target"
)

df_aggregated.show(5)



+--------------------+------+--------------------+------+
|          account_id|amount|            offer_id|target|
+--------------------+------+--------------------+------+
|0020ccbbb6d84e358...|  7.47|                NULL|     0|
|004b041fbfe448599...| 19.93|fafdcd668e3743c1b...|     1|
|0056df74b63b42988...| 29.89|                NULL|     0|
|008d7088107b46889...|  2.46|5a8bc65990b245e5a...|     0|
|0092a132ead946ceb...|  1.32|2906b810c7d441179...|     0|
+--------------------+------+--------------------+------+
only showing top 5 rows



                                                                                

In [70]:
df_aggregated = df_transactions.groupBy("account_id").agg(
    last("amount").alias("amount"),
    last("event").alias("last_event"),
    last("offer_id").alias("offer_id"),
)

# Create the 'target' column based on the last event
df_aggregated = df_aggregated.withColumn(
    "target",
    when(col("last_event") == "offer completed", 1).otherwise(0)
)

# Show the resulting DataFrame
df_aggregated.show(5)




+--------------------+------+---------------+--------------------+------+
|          account_id|amount|     last_event|            offer_id|target|
+--------------------+------+---------------+--------------------+------+
|0020ccbbb6d84e358...|  7.47|    transaction|                NULL|     0|
|004b041fbfe448599...| 19.93|offer completed|fafdcd668e3743c1b...|     1|
|0056df74b63b42988...| 29.89|    transaction|                NULL|     0|
|008d7088107b46889...|  2.46|   offer viewed|5a8bc65990b245e5a...|     0|
|0092a132ead946ceb...|  1.32|   offer viewed|2906b810c7d441179...|     0|
+--------------------+------+---------------+--------------------+------+
only showing top 5 rows



                                                                                

In [71]:
df_transactions = df_aggregated.filter(col("offer_id").isNotNull())

In [72]:
df_transactions.show(10)



+--------------------+------+---------------+--------------------+------+
|          account_id|amount|     last_event|            offer_id|target|
+--------------------+------+---------------+--------------------+------+
|004b041fbfe448599...| 19.93|offer completed|fafdcd668e3743c1b...|     1|
|008d7088107b46889...|  2.46|   offer viewed|5a8bc65990b245e5a...|     0|
|0092a132ead946ceb...|  1.32|   offer viewed|2906b810c7d441179...|     0|
|00aee28bbb3848dd8...|  2.46|   offer viewed|ae264e3637204a6fb...|     0|
|00bbce6533f44ddea...|  5.39|   offer viewed|fafdcd668e3743c1b...|     0|
|00bc983061d3471e8...|  21.4| offer received|2906b810c7d441179...|     0|
|016871ea865d43389...| 19.04| offer received|fafdcd668e3743c1b...|     0|
|0206e1388c34454ca...| 20.86|   offer viewed|5a8bc65990b245e5a...|     0|
|029e063479234fb1b...| 20.18|   offer viewed|4d5c57ea9a6940dd8...|     0|
|02a3aa431c1047be8...| 15.97|offer completed|ae264e3637204a6fb...|     1|
+--------------------+------+---------

                                                                                

### Tratamento Costumers

In [74]:
file_location = "/Users/alyssonamaral/Documents/ifood-case/case_data/profile.json"
file_type = "json"

infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df_customers = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df_customers.show(5)

+---+-----------------+------+--------------------+-------------+
|age|credit_card_limit|gender|                  id|registered_on|
+---+-----------------+------+--------------------+-------------+
|118|             NULL|  NULL|68be06ca386d4c319...|     20170212|
| 55|         112000.0|     F|0610b486422d4921a...|     20170715|
|118|             NULL|  NULL|38fe809add3b4fcf9...|     20180712|
| 75|         100000.0|     F|78afa995795e4d85b...|     20170509|
|118|             NULL|  NULL|a03223e636434f42a...|     20170804|
+---+-----------------+------+--------------------+-------------+
only showing top 5 rows



In [75]:
df_age_118 = df_customers.filter(col("age") == 118)

gender_null_count = df_age_118.filter(isnull(col("gender"))).count()
gender_not_null_count = df_age_118.filter(col("gender").isNotNull()).count()

credit_card_null_count = df_age_118.filter(isnull(col("credit_card_limit"))).count()
credit_card_not_null_count = df_age_118.filter(col("credit_card_limit").isNotNull()).count()

print(f"Quantidade de registros com gender nulo: {gender_null_count}")
print(f"Quantidade de registros com gender não nulo: {gender_not_null_count}")
print(f"Quantidade de registros com credit_card_limit nulo: {credit_card_null_count}")
print(f"Quantidade de registros com credit_card_limit não nulo: {credit_card_not_null_count}")

Quantidade de registros com gender nulo: 2175
Quantidade de registros com gender não nulo: 0
Quantidade de registros com credit_card_limit nulo: 2175
Quantidade de registros com credit_card_limit não nulo: 0


### Tratamento do offers

In [76]:
file_location = "/Users/alyssonamaral/Documents/ifood-case/case_data/offers.json"
file_type = "json"

infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df_offers = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df_offers.show(5)

+--------------------+--------------+--------+--------------------+---------+-------------+
|            channels|discount_value|duration|                  id|min_value|   offer_type|
+--------------------+--------------+--------+--------------------+---------+-------------+
|[email, mobile, s...|            10|     7.0|ae264e3637204a6fb...|       10|         bogo|
|[web, email, mobi...|            10|     5.0|4d5c57ea9a6940dd8...|       10|         bogo|
|[web, email, mobile]|             0|     4.0|3f207df678b143eea...|        0|informational|
|[web, email, mobile]|             5|     7.0|9b98b8c7a33c4b65b...|        5|         bogo|
|        [web, email]|             5|    10.0|0b1e1539f2cc45b7b...|       20|     discount|
+--------------------+--------------+--------+--------------------+---------+-------------+
only showing top 5 rows



In [77]:
df_offers = df_offers.withColumn("channel_web", when(array_contains(col("channels"), "web"), 1).otherwise(0)) \
                     .withColumn("channel_email", when(array_contains(col("channels"), "email"), 1).otherwise(0)) \
                     .withColumn("channel_mobile", when(array_contains(col("channels"), "mobile"), 1).otherwise(0)) \
                     .withColumn("channel_social", when(array_contains(col("channels"), "social"), 1).otherwise(0))

df_offers = df_offers.drop("channels")

df_offers = df_offers.withColumn(
    "offer_bogo",
    when(col("offer_type") == "bogo", 1).otherwise(0)
).withColumn(
    "offer_informational",
    when(col("offer_type") == "informational", 1).otherwise(0)
).withColumn(
    "offer_discount",
    when(col("offer_type") == "discount", 1).otherwise(0)
)

df_offers = df_offers.drop("offer_type")

df_offers.show()

+--------------+--------+--------------------+---------+-----------+-------------+--------------+--------------+----------+-------------------+--------------+
|discount_value|duration|                  id|min_value|channel_web|channel_email|channel_mobile|channel_social|offer_bogo|offer_informational|offer_discount|
+--------------+--------+--------------------+---------+-----------+-------------+--------------+--------------+----------+-------------------+--------------+
|            10|     7.0|ae264e3637204a6fb...|       10|          0|            1|             1|             1|         1|                  0|             0|
|            10|     5.0|4d5c57ea9a6940dd8...|       10|          1|            1|             1|             1|         1|                  0|             0|
|             0|     4.0|3f207df678b143eea...|        0|          1|            1|             1|             0|         0|                  1|             0|
|             5|     7.0|9b98b8c7a33c4b65b...|

### Criação do df_modelo

#### Junção e tratamento

In [78]:
df_transactions.printSchema()

df_customers.printSchema()

df_offers.printSchema()

root
 |-- account_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- last_event: string (nullable = true)
 |-- offer_id: string (nullable = true)
 |-- target: integer (nullable = false)

root
 |-- age: long (nullable = true)
 |-- credit_card_limit: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: string (nullable = true)
 |-- registered_on: string (nullable = true)

root
 |-- discount_value: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- min_value: long (nullable = true)
 |-- channel_web: integer (nullable = false)
 |-- channel_email: integer (nullable = false)
 |-- channel_mobile: integer (nullable = false)
 |-- channel_social: integer (nullable = false)
 |-- offer_bogo: integer (nullable = false)
 |-- offer_informational: integer (nullable = false)
 |-- offer_discount: integer (nullable = false)



In [82]:
df_modelo = df_transactions.join(
    df_customers,
    df_transactions["account_id"] == df_customers["id"],
    "left"
).drop(df_customers["id"])

df_modelo = df_modelo.join(
    df_offers,
    df_modelo["offer_id"] == df_offers["id"],
    "left"
).drop(df_offers["id"])

df_modelo.show()



+--------------------+------+---------------+--------------------+------+---+-----------------+------+-------------+--------------+--------+---------+-----------+-------------+--------------+--------------+----------+-------------------+--------------+
|          account_id|amount|     last_event|            offer_id|target|age|credit_card_limit|gender|registered_on|discount_value|duration|min_value|channel_web|channel_email|channel_mobile|channel_social|offer_bogo|offer_informational|offer_discount|
+--------------------+------+---------------+--------------------+------+---+-----------------+------+-------------+--------------+--------+---------+-----------+-------------+--------------+--------------+----------+-------------------+--------------+
|004b041fbfe448599...| 19.93|offer completed|fafdcd668e3743c1b...|     1| 55|          74000.0|     F|     20180508|             2|    10.0|       10|          1|            1|             1|             1|         0|                  0|    

                                                                                

In [84]:
df_modelo = df_modelo.filter(col("amount").isNotNull())

In [86]:
df_modelo = df_modelo.withColumn(
    "gender_numeric",
    when(col("gender") == "O", 0)
    .when(col("gender") == "F", 1)
    .when(col("gender") == "M", 2)
    .otherwise(3)
)

In [87]:
df_modelo = df_modelo.drop(df_modelo["gender"])

In [93]:
# Calcular a média de credit_card_limit para os registros com age != 118 e com o mesmo event
df_avg_credit_card = df_modelo.filter(col("age") != 118) \
    .groupBy("last_event") \
    .agg({"credit_card_limit": "avg"}) \
    .withColumnRenamed("avg(credit_card_limit)", "avg_credit_card_limit")

# Juntar o DataFrame com a média de credit_card_limit e preencher os valores nulos
df_modelo_filled = df_modelo.join(df_avg_credit_card, "last_event", "left") \
    .withColumn(
        "credit_card_limit",
        when(col("age") == 118, col("avg_credit_card_limit")).otherwise(col("credit_card_limit"))
    ).drop("avg_credit_card_limit")

df_modelo = df_modelo_filled

In [94]:
df_modelo.show(5)



+---------------+--------------------+------+--------------------+------+---+-----------------+-------------+--------------+--------+---------+-----------+-------------+--------------+--------------+----------+-------------------+--------------+--------------+
|     last_event|          account_id|amount|            offer_id|target|age|credit_card_limit|registered_on|discount_value|duration|min_value|channel_web|channel_email|channel_mobile|channel_social|offer_bogo|offer_informational|offer_discount|gender_numeric|
+---------------+--------------------+------+--------------------+------+---+-----------------+-------------+--------------+--------+---------+-----------+-------------+--------------+--------------+----------+-------------------+--------------+--------------+
| offer received|00bc983061d3471e8...|  21.4|2906b810c7d441179...|     0| 59|          77000.0|     20171016|             2|     7.0|       10|          1|            1|             1|             0|         0|       

                                                                                

In [95]:
output_path = "/Users/alyssonamaral/Documents/ifood-case/case_data"
df_modelo.write.csv(output_path, header=True, mode="overwrite")

                                                                                