In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import pyspark.sql.functions as F
from pyspark.sql.window import Window

import boto3

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [2]:
import pandas as pd
import matplotlib.pyplot as plt

# Initialisation de Spark

In [3]:
# Initialisation de la session Spark
spark = SparkSession \
    .builder \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.port", "9000") \
    .master("local[*]") \
    .appName("Projet de Fin d'Etude") \
    .getOrCreate()

spark

In [4]:
sc = spark.sparkContext

# Chargement des données structurées

In [5]:
# Informations de connexion à RDS
jdbc_url = "jdbc:mysql://database-grp6.chcmvnn5gnpv.us-east-1.rds.amazonaws.com:3306/raw_data?serverTimezone=UTC"
db_properties = {
    "user": "admin",
    "password": "xxxx",
    "driver": "com.mysql.jdbc.Driver"
}

In [6]:
# Liste des noms des tables RDS
table_names = [
    "orders",
    "order_items",
    "order_payments",
    "order_reviews",
    "products",
    "products_translated",
    "geolocation",
    "states_name"
]

In [7]:
# Créer un dictionnaire de DataFrames
dataframes = {table_name: spark.read.jdbc(url=jdbc_url, table=table_name, properties=db_properties) for table_name in table_names}
dataframes

{'orders': DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp],
 'order_items': DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double],
 'order_payments': DataFrame[order_id: string, payment_sequential: int, payment_type: string, payment_installments: int, payment_value: double],
 'order_reviews': DataFrame[review_id: string, order_id: string, review_score: int, review_comment_title: string, review_comment_message: string, review_creation_date: timestamp, review_answer_timestamp: timestamp],
 'products': DataFrame[product_id: string, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_l

# Chargement des données non structurées

In [8]:
# Configuration des identifiants AWS
aws_access_key_id = ''
aws_secret_access_key = ''
region_name = 'us-east-1'

In [9]:
# Informations de connexion à DynamoDB
dynamodb = boto3.resource('dynamodb', 
                          aws_access_key_id=aws_access_key_id, 
                          aws_secret_access_key=aws_secret_access_key, 
                          region_name=region_name)

In [10]:
#Lecture de la table "customers" de DynamoDB
table = dynamodb.Table("customers")

# Récupérer les items de la table
response = table.scan()
items = response['Items']

# Convertir les items en DataFrame Spark
df = spark.createDataFrame(items)

# Sélectionner les champs
dataframes["customers"] = df.select(
    F.col("customer_id"),
    F.col("customer_unique_id"),
    F.col("customer_zip_code"),
    F.col("customer_city"),
    F.col("customer_state")
)

In [11]:
#Lecture de la table "sellers" de DynamoDB
table = dynamodb.Table("sellers")

# Récupérer les items de la table
response = table.scan()
items = response['Items']

# Convertir les items en DataFrame Spark
df = spark.createDataFrame(items)

# Sélectionner les champs
dataframes["sellers"] = df.select(
    F.col("seller_id"),
    F.col("seller_zip_code"),
    F.col("seller_city"),
    F.col("seller_state")
)

In [12]:
dataframes

{'orders': DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp],
 'order_items': DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double],
 'order_payments': DataFrame[order_id: string, payment_sequential: int, payment_type: string, payment_installments: int, payment_value: double],
 'order_reviews': DataFrame[review_id: string, order_id: string, review_score: int, review_comment_title: string, review_comment_message: string, review_creation_date: timestamp, review_answer_timestamp: timestamp],
 'products': DataFrame[product_id: string, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_l

# Changement type variables

In [13]:
dataframes["customers"] = dataframes["customers"].withColumn("customer_zip_code", F.col("customer_zip_code").cast("int"))
dataframes["sellers"] = dataframes["sellers"].withColumn("seller_zip_code", F.col("seller_zip_code").cast("int"))

In [14]:
dataframes["customers"].printSchema()
dataframes["sellers"].printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



# Changement nom colonne

In [15]:
dataframes["products"] = dataframes["products"] \
    .withColumnRenamed("product_name_lenght", "product_name_length") \
    .withColumnRenamed("product_description_lenght", "product_description_length")

# Nettoyage des données dupliquées

In [16]:
# Parcourir chaque DataFrame et compter les lignes dupliquées
for key, df in dataframes.items():
    duplicate_rows = df.groupBy(df.columns).count().where(F.col("count") > 1)
    print(f"Duplicates lines in {key} DataFrame : {duplicate_rows.count()}")
    if duplicate_rows.count() > 0:
        duplicate_rows.show(truncate=False)

Duplicates lines in orders DataFrame : 0
Duplicates lines in order_items DataFrame : 0
Duplicates lines in order_payments DataFrame : 0
Duplicates lines in order_reviews DataFrame : 0
Duplicates lines in products DataFrame : 0
Duplicates lines in products_translated DataFrame : 0
Duplicates lines in geolocation DataFrame : 128174
+---------------------------+-------------------+-------------------+----------------+-----------------+-----+
|geolocation_zip_code_prefix|geolocation_lat    |geolocation_lng    |geolocation_city|geolocation_state|count|
+---------------------------+-------------------+-------------------+----------------+-----------------+-----+
|1026                       |-23.53925652899767 |-46.633440525535235|sao paulo       |SP               |2    |
|1106                       |-23.531062064948905|-46.629505278925556|sao paulo       |SP               |2    |
|1241                       |-23.545792634196992|-46.6567234730798  |sao paulo       |SP               |3    |
|1

In [17]:
# Suppression des lignes dupliquées dans le Dataframe geolocation
dataframes["geolocation"] = dataframes["geolocation"].dropDuplicates()

# Vérification que les lignes dupliquées ont bien été supprimées
df = dataframes["geolocation"]
duplicate_rows = df.groupBy(df.columns).count().where(F.col("count") > 1)
print(f"Duplicates lines in geolocation DataFrame : {duplicate_rows.count()}")

Duplicates lines in geolocation DataFrame : 0


# Suppression des valeurs dupliquées de l'ID dans la table order_items

In [18]:
dataframes["order_items"].filter(F.col('order_id')=='00143d0f86d6fbd9f9b38ab440ac16f5') \
    .show(truncate=False)

+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+
|order_id                        |order_item_id|product_id                      |seller_id                       |shipping_limit_date|price|freight_value|
+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+
|00143d0f86d6fbd9f9b38ab440ac16f5|1            |e95ee6822b66ac6058e2e4aff656071a|a17f621c590ea0fab3d5d883e1630ec6|2017-10-20 18:07:52|21.33|15.1         |
|00143d0f86d6fbd9f9b38ab440ac16f5|2            |e95ee6822b66ac6058e2e4aff656071a|a17f621c590ea0fab3d5d883e1630ec6|2017-10-20 18:07:52|21.33|15.1         |
|00143d0f86d6fbd9f9b38ab440ac16f5|3            |e95ee6822b66ac6058e2e4aff656071a|a17f621c590ea0fab3d5d883e1630ec6|2017-10-20 18:07:52|21.33|15.1         |
+--------------------------------+-------------+----------------------

In [19]:
# Trouver la valeur maximale pour 'order_item_id' pour chaque 'order_id'
max_order_items_id = dataframes["order_items"] \
    .groupBy("order_id") \
    .agg(F.max("order_item_id").alias("max_order_item_id"))


# Joindre le résultat avec le df originel pour obtenir toutes les colonnes
order_items_joined = dataframes["order_items"].join(max_order_items_id, 
                                    (dataframes["order_items"].order_id == max_order_items_id.order_id) & 
                                    (dataframes["order_items"].order_item_id == max_order_items_id.max_order_item_id))


dataframes["order_items"] = order_items_joined.select(dataframes["order_items"]["*"])

In [20]:
dataframes["order_items"].filter(F.col('order_id')=='00143d0f86d6fbd9f9b38ab440ac16f5') \
    .show(truncate=False)

+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+
|order_id                        |order_item_id|product_id                      |seller_id                       |shipping_limit_date|price|freight_value|
+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+
|00143d0f86d6fbd9f9b38ab440ac16f5|3            |e95ee6822b66ac6058e2e4aff656071a|a17f621c590ea0fab3d5d883e1630ec6|2017-10-20 18:07:52|21.33|15.1         |
+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+



# Remplacement noms catégories de portugais à anglais

In [21]:
# Vérification que toutes les catégories en portugais ont une traduction en anglais
p = dataframes["products"].select("product_category_name").distinct()
t = dataframes["products_translated"].select("product_category_name").distinct()

p.subtract(t).show()

+---------------------+
|product_category_name|
+---------------------+
|                     |
+---------------------+



In [22]:
# Jointure entre products et products_translated sur la colonne product_category_name
joined_df = dataframes["products"].alias("p") \
    .join(
        dataframes["products_translated"].alias("pt"),
        F.col("p.product_category_name") == F.col("pt.product_category_name"),
        "left"
    )

# Suppression du champs avec le om du produit en portugais
joined_df = joined_df.drop("product_category_name")

# Renommage de la colonne product_category_name_english
joined_df = joined_df.withColumnRenamed("product_category_name_english", "product_category_name")

# Réagencement des colonnes du DataFrame
new_columns = [joined_df.columns[0]] + ["product_category_name"] + [col for col in joined_df.columns[1:-1]]
dataframes["products"] = joined_df.select(new_columns)

# Mettre à jour le DataFrame dans le dictionnaire
dataframes["products"].show(5)


+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|00088930e925c41fd...|                 auto|                 56|                       752|                 4|            1225|               55|               10|              26|
|0011c512eb256aa0d...|                 auto|                 58|                       177|                 1|             100|               16|               15|              16|
|000d9be29b5207b54...|        watches_gifts|                 48|                       613|    

# Nettoyage des valeurs nulles

In [23]:
# Fonction pour compter les valeurs nulles par colonne
def count_nulls(df):
    return df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])

In [24]:
# Parcourir chaque DataFrame et compter les valeurs nulles
for key, df in dataframes.items():
    null_counts = count_nulls(df)
    print(f"Null values in {key} DataFrame :")
    null_counts.show(truncate=False)

Null values in orders DataFrame :
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|0       |0          |0           |0                       |160              |1783                        |2965                         |0                            |
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+

Null values in order_items DataFrame :
+--------+-------------+----------+---------+-------------------+-----+-------------+


In [25]:
# Remplacement des valeurs nulles de "products" : String -> "Other"  &  Int -> -1
dataframes["products"] = dataframes["products"] \
    .withColumn("product_category_name", F.when(F.col("product_category_name").isNull(), "unknown").otherwise(F.col("product_category_name"))) \
    .withColumn("product_name_length", F.when(F.col("product_name_length").isNull(), -1).otherwise(F.col("product_name_length"))) \
    .withColumn("product_description_length", F.when(F.col("product_description_length").isNull(), -1).otherwise(F.col("product_description_length"))) \
    .withColumn("product_photos_qty", F.when(F.col("product_photos_qty").isNull(), -1).otherwise(F.col("product_photos_qty"))) \
    .withColumn("product_weight_g", F.when(F.col("product_weight_g").isNull(), -1).otherwise(F.col("product_weight_g"))) \
    .withColumn("product_length_cm", F.when(F.col("product_length_cm").isNull(), -1).otherwise(F.col("product_length_cm"))) \
    .withColumn("product_height_cm", F.when(F.col("product_height_cm").isNull(), -1).otherwise(F.col("product_height_cm"))) \
    .withColumn("product_width_cm", F.when(F.col("product_width_cm").isNull(), -1).otherwise(F.col("product_width_cm")))

# Vérification que toutes les valeurs nulles de products ont été remplacée
null_counts = count_nulls(dataframes["products"])
print(f"Null values in products DataFrame :")
null_counts.show(truncate=False)

Null values in products DataFrame :
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|0         |0                    |0                  |0                         |0                 |0               |0                |0                |0               |
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



# Ajout  champs 

In [26]:
dataframes["order_items"] = dataframes["order_items"] \
    .withColumn("total_items_value", F.col("price") * F.col("order_item_id")) \
    .withColumn("total_freight_value", F.col("freight_value") * F.col("order_item_id")) \
    .withColumn("total_order_value", F.col("total_items_value") + F.col("total_freight_value"))

dataframes["order_items"].show(10)

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-----------------+-------------------+------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|total_items_value|total_freight_value| total_order_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-----------------+-------------------+------------------+
|0103878b7ed86b6ec...|            1|447934cd014b03328...|b4f8921fcc4ff77b6...|2018-08-20 02:15:41|229.99|        35.37|           229.99|              35.37|            265.36|
|01fba151a6fa8315f...|            1|c1488892604e4ba5c...|1554a68530182680a...|2017-07-14 16:23:25|  38.5|        19.88|             38.5|              19.88|58.379999999999995|
|021f26893462ec6a6...|            1|f6fa93875f4a7dac0...|391fc6631aebcf300...|2017-07-18 17:32:13| 49.95|        11

# Chargement des Dataframes nettoyés en Table

In [27]:
# Informations de connexion à RDS
jdbc_url = "jdbc:mysql://database-grp6.chcmvnn5gnpv.us-east-1.rds.amazonaws.com:3306/cleaned_data?serverTimezone=UTC"
db_properties = {
    "user": "admin",
    "password": "xxx",
    "driver": "com.mysql.jdbc.Driver"
}

In [28]:
# Écrire le DataFrame dans la table RDS
for table_name, df in dataframes.items() :
    print("Début : ", table_name)
    df.write.jdbc(url=jdbc_url, table=table_name, mode="overwrite", properties=db_properties)
    print("Fin : ", table_name)

Début :  orders
Fin :  orders
Début :  order_items
Fin :  order_items
Début :  order_payments
Fin :  order_payments
Début :  order_reviews
Fin :  order_reviews
Début :  products
Fin :  products
Début :  products_translated
Fin :  products_translated
Début :  geolocation
Fin :  geolocation
Début :  states_name
Fin :  states_name
Début :  customers
Fin :  customers
Début :  sellers
Fin :  sellers
