In [1]:
import pandas as pd

In [2]:
!pip install pyspark




[notice] A new release of pip available: 22.3.1 -> 23.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


## Customers

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("teste").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = (
    spark
    .read
    .format("csv")
    .options(header='true', inferSchema='true', delimiter=",")
    .load("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\olist_customers_dataset.csv")
)

df.printSchema()

(
    df
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\")
)

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [4]:
# Import data:
df = spark.read.parquet('C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\')

# Clean data:
df = df.na.drop()

# Select important features from data
columns = [
    "customer_id",
    "customer_unique_id",
    "customer_zip_code_prefix",
    "customer_state"
]

customers = df.select(*columns)

(
    customers
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\customers\\")
)

customers.show()

+--------------------+--------------------+------------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|customer_state|
+--------------------+--------------------+------------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|                    4534|            SP|
|5e274e7a0c3809e14...|57b2a98a409812fe9...|                   35182|            MG|
|5adf08e34b2e99398...|1175e95fb47ddff9d...|                   81560|        

## Order Payments

In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("teste").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = (
    spark
    .read
    .format("csv")
    .options(header='true', inferSchema='true', delimiter=",")
    .load("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\olist_order_payments_dataset.csv")
)

df.printSchema()

(
    df
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\")
)

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [6]:
# Import data:
df = spark.read.parquet('C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\')

# Clean data:
df = df.na.drop()

# Select important features from data
columns = [
    "order_id",
    "payment_type",
    "payment_value"
]
payments = df.select(*columns)

(
    payments
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\payments\\")
)

payments.show()

+--------------------+------------+-------------+
|            order_id|payment_type|payment_value|
+--------------------+------------+-------------+
|b81ef226f3fe1789b...| credit_card|        99.33|
|a9810da82917af2d9...| credit_card|        24.39|
|25e8ea4e93396b6fa...| credit_card|        65.71|
|ba78997921bbcdc13...| credit_card|       107.78|
|42fdf880ba16b47b5...| credit_card|       128.45|
|298fcdf1f73eb413e...| credit_card|        96.12|
|771ee386b001f0620...| credit_card|        81.16|
|3d7239c394a212faa...| credit_card|        51.84|
|1f78449c87a54faf9...| credit_card|       341.09|
|0573b5e23cbd79800...|      boleto|        51.95|
|d88e0d5fa41661ce0...| credit_card|       188.73|
|2480f727e869fdeb3...| credit_card|        141.9|
|616105c9352a9668c...| credit_card|        75.78|
|cf95215a722f3ebf2...| credit_card|       102.66|
|769214176682788a9...| credit_card|       105.28|
|12e5cfe0e4716b59a...| credit_card|       157.45|
|61059985a6fc0ad64...| credit_card|       132.04|


## Order Items

In [7]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.master("local[1]").appName("teste").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = (
    spark
    .read
    .format("csv")
    .options(header='true', inferSchema='true', delimiter=",")
    .load("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\olist_order_items_dataset.csv")
)

df.printSchema()

(
    df
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\")
)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [8]:
# Import data:
df = spark.read.parquet('C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\')

# Clean data:
df = df.na.drop()

# Select important features from data
columns = [
    "order_id",
    "product_id",
    "price",
    "freight_value"
]
df = df.select(*columns)

# Total Price:
items = df.select(
    col("order_id"),
    col("product_id"),
    col("price"),
    col("freight_value"),
    ((col("price") + col("freight_value"))).alias("price_total")
)

(
    items
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\items\\")
)

items.show()

+--------------------+--------------------+------+-------------+------------------+
|            order_id|          product_id| price|freight_value|       price_total|
+--------------------+--------------------+------+-------------+------------------+
|00010242fe8c5a6d1...|4244733e06e7ecb49...|  58.9|        13.29|             72.19|
|00018f77f2f0320c5...|e5f2d52b802189ee6...| 239.9|        19.93|            259.83|
|000229ec398224ef6...|c777355d18b72b67a...| 199.0|        17.87|            216.87|
|00024acbcdf0a6daa...|7634da152a4610f15...| 12.99|        12.79|             25.78|
|00042b26cf59d7ce6...|ac6c3623068f30de0...| 199.9|        18.14|218.04000000000002|
|00048cc3ae777c65d...|ef92defde845ab845...|  21.9|        12.69|34.589999999999996|
|00054e8431b9d7675...|8d4f2bb7e93e6710a...|  19.9|        11.85|             31.75|
|000576fe39319847c...|557d850972a7d6f79...| 810.0|        70.75|            880.75|
|0005a1a1728c9d785...|310ae3c140ff94b03...|145.95|        11.65|            

## Order Reviews

In [9]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.master("local[1]").appName("teste").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = (
    spark
    .read
    .format("csv")
    .options(header='true', inferSchema='true', delimiter=",")
    .load("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\olist_order_reviews_dataset.csv")
)

df.printSchema()

(
    df
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\")
)

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



In [10]:
# Import data:
df = spark.read.parquet('C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\')

# Clean data:
df = df.na.drop()

# Select important features from data
columns = [
    "review_id",
    "order_id",
    "review_score"
]

reviews = df.select(*columns)

(
    reviews
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\reviews\\")
)

reviews.show()

+--------------------+--------------------+------------+
|           review_id|            order_id|review_score|
+--------------------+--------------------+------------+
|8670d52e15e00043a...|b9bf720beb4ab3728...|           4|
|3948b09f7c818e2d8...|e51478e7e277a8374...|           5|
|373cbeecea8286a2b...|583174fbe37d3d5f0...|           1|
|d21bbc789670eab77...|4fc44d78867142c62...|           5|
|c92cdd7dd544a01aa...|37e7875cdce5a9e5b...|           4|
|08c9d79ec0eba1d25...|e029f708df3cc108b...|           5|
|b193ff3c9f32a01f3...|e2e6ee1ed2d7f2f36...|           5|
|86c5cfa7fcbde303f...|a6456e781cb962cc3...|           5|
|500c05500aa275953...|8a9424899aac432d8...|           5|
|109b5ce2dd11bb846...|25362fbf6aac4b01a...|           5|
|c45811d9f90e22a81...|491f193fc52075598...|           5|
|50a1eaa2f96d6f3e0...|4a7cf245701068d38...|           5|
|1692078634b63c7f2...|5bc4e94aef2841f39...|           5|
|46d8249ea59101c72...|f25ddb6cd62d720a5...|           3|
|79927442ebcbf70b2...|1c8898140

## Order Dataset

In [11]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.master("local[1]").appName("teste").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = (
    spark
    .read
    .format("csv")
    .options(header='true', inferSchema='true', delimiter=",")
    .load("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\olist_orders_dataset.csv")
)

df.printSchema()

(
    df
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\")
)

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [12]:
# Import data:
df = spark.read.parquet('C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\parquet\\')

# Clean data:
df = df.na.drop()

# Select important features from data
columns = [
    "order_id",
    "customer_id",
    "order_status"
]

dataset = df.select(*columns)

(
    dataset
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\dataset\\")
)

dataset.show()

+--------------------+--------------------+------------+
|            order_id|         customer_id|order_status|
+--------------------+--------------------+------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|   delivered|
|949d5b44dbf5de918...|f88197465ea7920ad...|   delivered|
|ad21c59c0840e6cb8...|8ab97904e6daea886...|   delivered|
|a4591c265e18cb1dc...|503740e9ca751ccdd...|   delivered|
|6514b8ad8028c9f2c...|9bdf08b4b3b52b552...|   delivered|
|76c6e866289321a7c...|f54a9f0e6b351c431...|   delivered|
|e69bfb5eb88e0ed6a...|31ad1d1b63eb99624...|   delivered|
|e6ce16cb79ec1d90b...|494dded5b201313c6...|   delivered|
|34513ce0c4fab462a...|7711cf624183d843a...|   delivered|
|82566a660a982b15f...|d3e3b74c766bc6214...|   delivered|
|5ff96c15d0b717ac6...|19402a48fe860416a...|   delivered|
|432aaf21d85167c2c...|3df704f53d3f1d481...|   delivered|
|dcb36b511fcac050b...|3b6828a50

## Agregação

In [13]:
items = spark.read.parquet("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\items\\")
payments = spark.read.parquet("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\payments\\")
customers = spark.read.parquet("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\customers\\")
reviews = spark.read.parquet("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\reviews\\")
dataset = spark.read.parquet("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\dataset\\")

agg1 = items.join(payments , on=['order_id'] , how = 'left')
agg2 = agg1.join(reviews , on=['order_id'] , how = 'left')
agg3 = agg2.join(dataset, on=['order_id'] , how = 'left')
agg = agg3.join(customers , on=['customer_id'] , how = 'left')

agg.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- price_total: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_state: string (nullable = true)



In [14]:
agg.show()

+--------------------+--------------------+--------------------+------+-------------+------------------+------------+-------------+--------------------+------------+------------+--------------------+------------------------+--------------+
|         customer_id|            order_id|          product_id| price|freight_value|       price_total|payment_type|payment_value|           review_id|review_score|order_status|  customer_unique_id|customer_zip_code_prefix|customer_state|
+--------------------+--------------------+--------------------+------+-------------+------------------+------------+-------------+--------------------+------------+------------+--------------------+------------------------+--------------+
|3ce436f183e68e078...|00010242fe8c5a6d1...|4244733e06e7ecb49...|  58.9|        13.29|             72.19| credit_card|        72.19|                null|        null|   delivered|871766c5855e863f6...|                   28013|            RJ|
|f6dd3ec061db4e398...|00018f77f2f0320c5.

In [18]:
(
    agg
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\agg\\")
)

In [16]:
# (
#     agg
#     .write
#     .mode("overwrite")
#     .format("csv")
#     .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\agg\\")
# )

## Recomendação

In [71]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer

rec = spark.read.parquet("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\agg\\")
columns = [
    "customer_id",
    "price",
    "review_score"    
]

# Amostra dos dados:
rec = rec.select(*columns).limit(50000)
rec = rec.withColumn("review_score",rec.review_score.cast('int')).fillna(0)

# Codificando as colunas para o modelo:
indexer = (rec.select("customer_id").distinct().orderBy("customer_id").withColumn("cid", monotonically_increasing_id()))
rec = rec.join(indexer, ["customer_id"])

rec = rec.withColumn(
    "priceId",
    when(rec.price <= 10, 1)
    .when(rec.price <= 20, 2)
    .when(rec.price <= 30, 3)
    .when(rec.price <= 40, 4)
    .when(rec.price <= 50, 5)
    .when(rec.price <= 60, 6)
    .when(rec.price <= 70, 7)
    .when(rec.price <= 80, 8)
    .when(rec.price <= 90, 9)
    .when(rec.price <= 100, 10)
    .when(rec.price <= 110, 11)
    .when(rec.price <= 120, 12)
    .when(rec.price <= 130, 13)
    .when(rec.price <= 140, 14)
    .when(rec.price <= 150, 15)
    .otherwise(16))

columns = [
    "cid",
    "priceId",
    "review_score"
]

rec = rec.select(*columns).distinct().fillna(0)
rec = (
    rec
    .withColumn("review_score",rec.review_score.cast('int'))
    .withColumn("cid",rec.cid.cast('int'))
    .withColumn("priceId",rec.priceId.cast('int'))
    .fillna(0)
    .orderBy("cid")
)

(
    rec
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\rec\\")
)

# Create test and train set
(train, test) = rec.randomSplit([0.8, 0.2], seed = 2020)

model = ALS(userCol="cid", itemCol="priceId", ratingCol="review_score", nonnegative=True, coldStartStrategy="drop").fit(train)
evaluator=RegressionEvaluator(metricName="rmse",labelCol="review_score",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("New RMSE: ", evaluator.evaluate(model.transform(test)))

pred = model.transform(rec)

(
    pred
    .write
    .mode("overwrite")
    .format("parquet")
    .save("C:\\Users\\Gabriel\\Desktop\\backup\\Repositorios\\MBA_project\\data\\olist\\archive\\pred\\")
)

pred.show(10)

New RMSE:  0.7153350863852701
+-----+-------+------------+----------+
|  cid|priceId|review_score|prediction|
+-----+-------+------------+----------+
| 9797|      6|           0|       0.0|
|39428|     16|           0|       0.0|
|16106|     16|           0|       0.0|
|34169|      2|           0|       0.0|
|20781|      3|           0|       0.0|
| 8155|      2|           0|       0.0|
|25534|     16|           0|       0.0|
| 3532|     15|           0|       0.0|
|32010|      6|           0|       0.0|
|17037|      5|           0|       0.0|
+-----+-------+------------+----------+
only showing top 10 rows



In [72]:
pred.printSchema()

root
 |-- cid: integer (nullable = false)
 |-- priceId: integer (nullable = false)
 |-- review_score: integer (nullable = true)
 |-- prediction: float (nullable = false)

