In [23]:
import time


start_time = time.time()


In [24]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")



# NOTE: Whichever package you want mention here.
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'


In [25]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col,when,create_map, lit,regexp_replace,isnull,trim
from itertools import chain
import pyspark.sql.functions as F

# Create a SparkConf object to configure Spark properties
conf = SparkConf()


# Set the number of executors and driver memory requirements
#conf.set("spark.executor.instances", "2")  
conf.set("spark.driver.memory", "1g")  
#conf.set("spark.executor.memory", "1g")
#conf.set("spark.executor.cores", "2")
conf.set("spark.sql.shuffle.partitions", "3")
conf.set("spark.sparkContext.defaultParallelism", "3")
# Create a SparkSession with the configured SparkConf
spark = SparkSession.builder \
    .appName("Spark ETL") \
    .master("local[3]") \
    .config("spark.submit.deployMode", "client") \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext



In [27]:
#connection details
url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&allowPublicKeyRetrieval=true"
driver = "com.mysql.jdbc.Driver"
user = "bigdata"
password = "Bigdata@123"


In [28]:
import logging
logging.basicConfig(filename='error.log', level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')

def ingest_table(dbtable):
        """
        Read data from table in database.

        Args:
            dbtable (DatabaseName_TableName): The input Tablename.
        

        Returns:
            DataFrame: A DataFrame
        """
        try:
            df = spark.read \
                .format("jdbc") \
                .option("driver", driver) \
                .option("url", url) \
                .option("user", user) \
                .option("password", password) \
                .option("dbtable", dbtable) \
                .load()
            return df.exceptAll(df.limit(1))
        except Exception as e:
            
            logging.error(f"Error ingesting table {dbtable}: {str(e)}")
            
            return None
        
        
df_customers = ingest_table("project.customers")
df_payments = ingest_table("project.payments")
print(df_customers.rdd.getNumPartitions())

4


In [29]:
def ingest_csv(file_path):
        """
        Read data from csv file in local file system.

        Args:
            file_path (Path of the file): The input filename.
        

        Returns:
            Dataframe : A dataframe

        """

        try:
            df = spark.read \
                .format("csv") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .load(file_path)
            return df
        
        except Exception as e:
            logging.error(f"Error ingesting CSV file {file_path}: {str(e)}")
            
            return None
        
        
df_orders = ingest_csv("file:///home/talentum/project/orders.csv")
df_order_items = ingest_csv("file:///home/talentum/project/order_items.csv")
df_products = ingest_csv("file:///home/talentum/project/products.csv")
print(df_orders.rdd.getNumPartitions())

4


In [30]:
df_products=df_products.drop("product_weight_g")

In [31]:
def delete_rows_by_order_status(df, statuses_to_delete):
    """
    Deletes rows from a DataFrame based on specified order statuses.

    Args:
        df (DataFrame): The input DataFrame.
        statuses_to_delete (list): A list of order statuses to be deleted.

    Returns:
        DataFrame: A new DataFrame with rows removed based on the order statuses.
    """
    # Use the filter transformation to keep only rows with order statuses not in the specified list
    cleaned_df = df.filter(~df["order_status"].isin(statuses_to_delete))

    return cleaned_df

df_orders=delete_rows_by_order_status(df_orders, ["approved","created","invoiced","processing","shipped","unavailable"])


In [32]:


df_orders = df_orders.filter(~((df_orders['order_status'] == 'delivered') & isnull(df_orders['order_delivered_timestamp'])))


# Replace null values in the "deliver_at" column with a fixed date
fixed_date = '01-01-2000 00:00'
df_orders = df_orders.withColumn('order_delivered_timestamp', when(df_orders['order_delivered_timestamp'].isNull(), lit(fixed_date)).otherwise(df_orders['order_delivered_timestamp']))


In [33]:
def remove_rows_with_nulls(df_v):
    """
    Removes rows from a DataFrame where any of the columns contain null (None) values.

    Args:
        df (DataFrame): The input DataFrame.

    Returns:
        DataFrame: A new DataFrame with rows containing null values removed.
    """
    cleaned_df = df_v.na.drop()

    return cleaned_df


df_customers=remove_rows_with_nulls(df_customers)
df_orders=remove_rows_with_nulls(df_orders)
df_order_items=remove_rows_with_nulls(df_order_items)
df_payments=remove_rows_with_nulls(df_payments)
df_products=remove_rows_with_nulls(df_products)


In [34]:

df_customers = df_customers.withColumn("customer_state", regexp_replace(col("customer_state").cast("string"), "\\s+", ""))

In [35]:

dictionary={'SP': 'São Paulo',
 'SC': 'Santa Catarina',
 'MG': 'Minas Gerais',
 'PR': 'Paraná',
 'RJ': 'Rio de Janeiro',
 'RS': 'Rio Grande do Sul',
 'PA': 'Pará',
 'GO': 'Goiás',
 'ES': 'Espírito Santo',
 'BA': 'Bahia',
 'MA': 'Maranhão',
 'MS': 'Mato Grosso do Sul',
 'CE': 'Ceará',
 'DF': 'Distrito Federal',
 'RN': 'Rio Grande do Norte',
 'PE': 'Pernambuco',
 'MT': 'Mato Grosso',
 'AM': 'Amazonas',
 'AP': 'Amapá',
 'AL': 'Alagoas',
 'RO': 'Rondônia',
 'PB': 'Paraíba',
 'TO': 'Tocantins',
 'PI': 'Piauí',
 'AC': 'Acre',
 'SE': 'Sergipe',
 'RR': 'Roraima'}




In [36]:

  
mapping_expr = create_map([lit(x) for x in chain(*dictionary.items())])
  
        # Create a new column by calling the function to map the values
        
df_customers = df_customers.withColumn("state",mapping_expr[col("customer_state")])


In [37]:
dictionary_product={
  "perfumery": "Health and Beauty",
  "art": "Art",
  "sports_leisure": "Sports and Leisure",
  "baby": "Baby",
  "housewares": "Housewares",
  "musical_instruments": "Musical Instruments",
  "cool_stuff": "Cool Stuff",
  "furniture_decor": "Furniture",
  "home_appliances": "Home Appliances",
  "toys": "Toys",
  "bed_bath_table": "Homewares",
  "construction_tools_safety": "Construction Tools",
  "computers_accessories": "Electronics",
  "health_beauty": "Health and Beauty",
  "luggage_accessories": "Travel",
  "garden_tools": "Garden",
  "office_furniture": "Furniture",
  "auto": "Automotive",
  "electronics": "Electronics",
  "fashion_shoes": "Fashion",
  "telephony": "Telephony",
  "stationery": "Stationery",
  "fashion_bags_accessories": "Fashion",
  "computers": "Electronics",
  "home_construction": "Home Improvement",
  "watches_gifts": "Gifts",
  "construction_tools_construction": "Construction Tools",
  "pet_shop": "Pets",
  "small_appliances": "Home Appliances",
  "agro_industry_and_commerce": "Business",
  "NA": "Others",
  "furniture_living_room": "Furniture",
  "signaling_and_security": "Security",
  "air_conditioning": "Home Appliances",
  "consoles_games": "Entertainment",
  "books_general_interest": "Books",
  "costruction_tools_tools": "Construction Tools",
  "fashion_underwear_beach": "Fashion",
  "fashion_male_clothing": "Fashion",
  "kitchen_dining_laundry_garden_furniture": "Furniture",
  "industry_commerce_and_business": "Business",
  "fixed_telephony": "Telephony",
  "construction_tools_lights": "Construction Tools",
  "books_technical": "Books",
  "home_appliances_2": "Home Appliances",
  "party_supplies": "Others",
  "drinks": "Food and Drink",
  "market_place": "Others",
  "la_cuisine": "Home Appliances",
  "costruction_tools_garden": "Garden",
  "fashio_female_clothing": "Fashion",
  "home_confort": "Home Improvement",
  "audio": "Electronics",
  "food_drink": "Food and Drink",
  "music": "Entertainment",
  "food": "Food and Drink",
  "tablets_printing_image": "Electronics",
  "books_imported": "Books",
  "small_appliances_home_oven_and_coffee": "Home Appliances",
  "fashion_sport": "Sports and Leisure",
  "christmas_supplies": "Others",
  "fashion_childrens_clothes": "Fashion",
  "dvds_blu_ray": "Entertainment",
  "arts_and_craftmanship": "Arts and Crafts",
  "furniture_bedroom": "Furniture",
  "cine_photo": "Entertainment",
  "diapers_and_hygiene": "Baby",
  "flowers": "Home and Garden",
  "home_comfort_2": "Home Improvement",
  "security_and_services": "Security",
  "furniture_mattress_and_upholstery": "Furniture"
}


In [38]:
mapping_expr1 = create_map([lit(y) for y in chain(*dictionary_product.items())])

df_products= df_products.withColumn("category",mapping_expr1[col("product_category_name")])

In [39]:
#df.drop(["order_approved_at", "order_delivered_timestamp"])
df_customers=df_customers.drop("customer_state")
df_products=df_products.drop("product_category_name")

In [40]:

# Remove leading and trailing spaces from the "order_purchase_timestamp" column
df_orders = df_orders.withColumn("order_purchase_timestamp", trim(col("order_purchase_timestamp").cast("string")))
df_orders = df_orders.withColumn("order_approved_at", trim(col("order_approved_at").cast("string")))
df_orders = df_orders.withColumn("order_delivered_timestamp", trim(col("order_delivered_timestamp").cast("string")))
df_orders = df_orders.withColumn("order_estimated_delivery_date", trim(col("order_estimated_delivery_date").cast("string")))

In [41]:
from pyspark.sql.functions import col

df_final = df_customers.alias("t1").join(df_orders.alias("t2"), col("t1.customer_id") == col("t2.customer_id")) \
    .join(df_payments.alias("t3"), col("t2.order_id") == col("t3.order_id")) \
    .join(df_order_items.alias("t4"), col("t3.order_id") == col("t4.order_id")) \
    .join(df_products.alias("t5"), col("t4.product_id") == col("t5.product_id")) \
    .select(['t1.customer_id',
             't4.order_id',
             't1.customer_zip_code_prefix',
             't1.customer_city',
             't1.state',
             't2.order_status',
             't2.order_purchase_timestamp',
             't2.order_approved_at',
             't2.order_delivered_timestamp',
             't2.order_estimated_delivery_date',
             't3.payment_sequential',
             't3.payment_type',
             't3.payment_installments',
             't3.payment_value',
             't4.order_item_id',
             't4.seller_id',
             't4.price',
             't4.shipping_charges',
             't5.product_id',
             't5.category',
             't5.product_length_cm',
             't5.product_height_cm',
             't5.product_width_cm'])



In [42]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Define a custom schema for the desired columns
custom_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("customer_zip_code_prefix", IntegerType(), True),
    StructField("customer_city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("order_purchase_timestamp", TimestampType(), True),
    StructField("order_approved_at", TimestampType(), True),
    StructField("order_delivered_timestamp", TimestampType(), True),
    StructField("order_estimated_delivery_date", TimestampType(), True),
    StructField("payment_sequential", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", DoubleType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("seller_id", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("shipping_charges", DoubleType(), True),
    StructField("product_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("product_length_cm", DoubleType(), True),
    StructField("product_height_cm", DoubleType(), True),
    StructField("product_width_cm", DoubleType(), True),
])




In [43]:

df_final.write \
    .mode("overwrite") \
    .partitionBy("order_status") \
    .option("path", "/home/talentum/retail_data") \
    .saveAsTable("retail",schema=custom_schema)

In [44]:
# Your Spark code here

end_time = time.time()
elapsed_time = end_time - start_time

print(f"Time taken: {elapsed_time} seconds")

Time taken: 25.575178861618042 seconds
