In [5]:
%pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [6]:
# STEP 1: Set up PySpark Session in Google Colab

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, mean, stddev, min, max, expr, sum as spark_sum
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import col
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName("BrazilianEcommerce").getOrCreate()

In [7]:

# Load datasets
customers_df = spark.read.csv("Datasets/olist_customers_dataset.csv", header=True, inferSchema=True)
geolocation_df = spark.read.csv("Datasets/olist_geolocation_dataset.csv", header=True, inferSchema=True)
item_df = spark.read.csv("Datasets/olist_order_items_dataset.csv", header=True, inferSchema=True)
payments_df = spark.read.csv("Datasets/olist_order_payments_dataset.csv", header=True, inferSchema=True)
orders_df = spark.read.csv("Datasets/olist_orders_dataset.csv", header=True, inferSchema=True)
products_df = spark.read.csv("Datasets/olist_products_dataset.csv", header=True, inferSchema=True)
sellers_df = spark.read.csv("Datasets/olist_sellers_dataset.csv", header=True, inferSchema=True)
product_category_df = spark.read.csv("Datasets/product_category_name_translation.csv", header=True, inferSchema=True)

                                                                                

In [8]:
from pyspark.sql.functions import col, count, when

# Check for null values
print("Initial count of null values:")
customers_df.select([count(when(col(c).isNull(), c)).alias(c) for c in customers_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", orders_df.count())
customers_df.dropDuplicates()
print("Number of rows after removing the duplicates:", orders_df.count())

customers_df.printSchema()

Initial count of null values:


                                                                                

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+

Number of rows before removing the duplicates: 99441
Number of rows after removing the duplicates: 99441
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [9]:
# Check for null values
print("Initial count of null values:")
item_df.select([count(when(col(c).isNull(), c)).alias(c) for c in item_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", item_df.count())
item_df.dropDuplicates()
print("Number of rows after removing the duplicates:", item_df.count())

item_df.printSchema()

# Seprate numerical and categorical column
numerical_cols = [col_name for col_name, dtype in item_df.dtypes if dtype in ['int', 'double']]
categorical_cols = [col_name for col_name, dtype in item_df.dtypes if dtype == 'string']

print("Numerical columns:", numerical_cols)
print("Categorical columns:", categorical_cols)

# Display summary statistics of numerical column
item_df.select(numerical_cols).describe().show()

Initial count of null values:


                                                                                

+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+

Number of rows before removing the duplicates: 112650
Number of rows after removing the duplicates: 112650
root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

Numerical columns: ['order_item_id', 'price', 'freight_value']
Categorical columns: ['order_id', 'product_id', 'seller_id']
+-------+------------------

In [10]:
# Calculate total_item_value
item_df = item_df.withColumn("total_item_value", col("order_item_id") * col("price"))

# Calculate total_freight_value
item_df = item_df.withColumn("total_freight_value", col("order_item_id") * col("freight_value"))

# Calculate total_order_value
item_df = item_df.withColumn("total_order_value", col("total_item_value") + col("total_freight_value"))

# Show the updated DataFrame with new columns
item_df.show()

item_df.printSchema()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----------------+-------------------+------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|total_item_value|total_freight_value| total_order_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----------------+-------------------+------------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|            58.9|              13.29|             72.19|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|           239.9|              19.93|            259.83|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87| 

In [11]:
# Check for null values
print("Initial count of null values:")
geolocation_df.select([count(when(col(c).isNull(), c)).alias(c) for c in geolocation_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", geolocation_df.count())
geolocation_df.dropDuplicates()
print("Number of rows after removing the duplicates:", geolocation_df.count())

geolocation_df.printSchema()

Initial count of null values:


                                                                                

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                          0|              0|              0|               0|                0|
+---------------------------+---------------+---------------+----------------+-----------------+

Number of rows before removing the duplicates: 1000163
Number of rows after removing the duplicates: 1000163
root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [12]:
# Check for null values
print("Initial count of null values:")
payments_df.select([count(when(col(c).isNull(), c)).alias(c) for c in payments_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", payments_df.count())
payments_df.dropDuplicates()
print("Number of rows after removing the duplicates:", payments_df.count())

payments_df.printSchema()

# Seprate numerical and categorical column
numerical_cols = [col_name for col_name, dtype in payments_df.dtypes if dtype in ['int', 'double']]
categorical_cols = [col_name for col_name, dtype in payments_df.dtypes if dtype == 'string']

print("Numerical columns:", numerical_cols)
print("Categorical columns:", categorical_cols)

# Display summary statistics of numerical column
payments_df.select(numerical_cols).describe().show()

Initial count of null values:


                                                                                

+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+

Number of rows before removing the duplicates: 103886
Number of rows after removing the duplicates: 103886
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)

Numerical columns: ['payment_sequential', 'payment_installments', 'payment_value']
Categorical columns: ['order_id', 'payment_type']
+-------+------------------+--------------------+------------------+
|summary|payment_sequential|payment_installments|     payment

In [13]:
# Filter rows where payment_sequential is less than or equal to 5
payments_df = payments_df.filter(col("payment_sequential") <= 5)

# Show filtered data
print("Filtered data:")
payments_df.show()

Filtered data:
+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|
|771ee386b001f0620...|                 1| credit_card|                   1|        81.16|
|3d7239c394a212faa...|                 1| credit_card|                   3|        51

In [14]:
# Number of rows
print("Number of rows:", orders_df.count())
# Number of columns
print("Number of columns:", len(orders_df.columns))

# Check for null values
print("Initial count of null values:")
orders_df.select([count(when(col(c).isNull(), c)).alias(c) for c in orders_df.columns]).show()

# Remove null values
orders_df = orders_df.na.drop()

# Check for null values again
print("Count of null values after removal:")
orders_df.select([count(when(col(c).isNull(), c)).alias(c) for c in orders_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", orders_df.count())
orders_df.dropDuplicates()
print("Number of rows after removing the duplicates:", orders_df.count())

orders_df.printSchema()

Number of rows: 99441
Number of columns: 8
Initial count of null values:
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+

Count of null values after removal:
+--------+-----------+------------+---------------

In [15]:
# Number of rows
print("Number of rows:", products_df.count())
# Number of columns
print("Number of columns:", len(products_df.columns))

# Check for null values
print("Initial count of null values:")
products_df.select([count(when(col(c).isNull(), c)).alias(c) for c in products_df.columns]).show()

# Remove null values
products_df = products_df.na.drop()

# Check for null values again
print("Count of null values after removal:")
products_df.select([count(when(col(c).isNull(), c)).alias(c) for c in products_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", products_df.count())
products_df.dropDuplicates()
print("Number of rows after removing the duplicates:", products_df.count())

# Check the schema of merged_products_items
products_df.printSchema()

# Example: Normalize numerical columns in merged_products_items
assembler = VectorAssembler(inputCols=["product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"],
                            outputCol="features")

# Handle missing values if any
products_df = products_df.na.fill(0.0)  # Fill missing values with 0.0 or appropriate strategy

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

pipeline = Pipeline(stages=[assembler, scaler])

# Fit pipeline to data and transform
try:
    normalized_merged_products_items = pipeline.fit(products_df).transform(products_df)
except Exception as e:
    print(f"Error during pipeline execution: {e}")

# Show first few rows of normalized DataFrame
normalized_merged_products_items.show()

Number of rows: 32951
Number of columns: 9
Initial count of null values:
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|         0|                  610|                610|                       610|               610|               2|                2|                2|               2|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+

Count of null values after removal:
+----------+---------------------+-

                                                                                

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+--------------------+--------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|            features|     scaled_features|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+--------------------+--------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|[225.0,16.0,10.0,...|[0.00556586270871...|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 

In [16]:
# Seprate numerical and categorical column
numerical_cols = [col_name for col_name, dtype in products_df.dtypes if dtype in ['int', 'double']]
categorical_cols = [col_name for col_name, dtype in products_df.dtypes if dtype == 'string']

print("Numerical columns:", numerical_cols)
print("Categorical columns:", categorical_cols)

# Display summary statistics of numerical column
products_df.select(numerical_cols).describe().show()

Numerical columns: ['product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm']
Categorical columns: ['product_id', 'product_category_name']


24/06/27 22:32:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-------------------+--------------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|product_name_lenght|product_description_lenght|product_photos_qty| product_weight_g| product_length_cm| product_height_cm|  product_width_cm|
+-------+-------------------+--------------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|              32340|                     32340|             32340|            32340|             32340|             32340|             32340|
|   mean|  48.47659245516388|         771.4923933209648|2.1889610389610388|2276.956586270872|30.854545454545455|16.958812615955473|23.208596165739024|
| stddev| 10.245698759400796|         635.1248313619135| 1.736786634688266|4279.291844890453|16.955964727614173|13.636115015503991|12.078762221674598|
|    min|                  5|                         4|                 1|                0| 

In [17]:
# Number of rows
print("Number of rows:", sellers_df.count())
# Number of columns
print("Number of columns:", len(sellers_df.columns))

# Check for null values
print("Initial count of null values:")
sellers_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sellers_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", sellers_df.count())
sellers_df.dropDuplicates()
print("Number of rows after removing the duplicates:", sellers_df.count())

sellers_df.printSchema()

Number of rows: 3095
Number of columns: 4
Initial count of null values:
+---------+----------------------+-----------+------------+
|seller_id|seller_zip_code_prefix|seller_city|seller_state|
+---------+----------------------+-----------+------------+
|        0|                     0|          0|           0|
+---------+----------------------+-----------+------------+

Number of rows before removing the duplicates: 3095
Number of rows after removing the duplicates: 3095
root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [18]:
# Number of rows
print("Number of rows:", product_category_df.count())
# Number of columns
print("Number of columns:", len(product_category_df.columns))

# Check for null values
print("Initial count of null values:")
product_category_df.select([count(when(col(c).isNull(), c)).alias(c) for c in product_category_df.columns]).show()

# Remove duplicates
print("Number of rows before removing the duplicates:", product_category_df.count())
product_category_df.dropDuplicates()
print("Number of rows after removing the duplicates:", product_category_df.count())

product_category_df.printSchema()

Number of rows: 71
Number of columns: 2
Initial count of null values:
+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|                    0|                            0|
+---------------------+-----------------------------+

Number of rows before removing the duplicates: 71
Number of rows after removing the duplicates: 71
root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



**Data Loading to SQL**


In [20]:
from sqlalchemy import create_engine

database_name = "olist_data"
user = "root"
password = "pruthvi12"
host = "localhost"
port = "3306"

engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{database_name}')

# List of DataFrames to load
dataframes = {
    'customers': customers_df,
    'geolocation': geolocation_df,
    'item': item_df,
    'payments': payments_df,
    'orders': orders_df,
    'products': products_df,
    'sellers': sellers_df,
    'product_category': product_category_df
}

# Iterate through each DataFrame and write to MySQL
for df_name, df in dataframes.items():
    try:
        # Convert Spark DataFrame to Pandas DataFrame
        pandas_df = df.toPandas()
        
        # Write DataFrame to MySQL table
        table_name = df_name
        pandas_df.to_sql(name=table_name, con=engine, if_exists='replace', index=False)
        
        print(f"DataFrame '{df_name}' loaded into MySQL table '{table_name}' successfully.")
    
    except Exception as e:
        print(f"Error loading DataFrame '{df_name}' into MySQL table:")
        print(e)


DataFrame 'customers' loaded into MySQL table 'customers' successfully.


Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:695)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:660)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:636)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:582)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:541)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)


DataFrame 'geolocation' loaded into MySQL table 'geolocation' successfully.
DataFrame 'item' loaded into MySQL table 'item' successfully.
DataFrame 'payments' loaded into MySQL table 'payments' successfully.
DataFrame 'orders' loaded into MySQL table 'orders' successfully.
DataFrame 'products' loaded into MySQL table 'products' successfully.
DataFrame 'sellers' loaded into MySQL table 'sellers' successfully.
DataFrame 'product_category' loaded into MySQL table 'product_category' successfully.


24/06/28 07:07:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 527649 ms exceeds timeout 120000 ms
24/06/28 07:07:14 WARN SparkContext: Killing executors is not supported by current scheduler.
24/06/28 07:07:14 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o