<a href="https://colab.research.google.com/github/horaciobaptista/spark/blob/main/Trabalho_BigData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **eCommerce purchase history from electronics store**
### This dataset contains 2.6M purchased products from online store
### Colunas: event_time, order_id, product_id, category_id, category_code, brand, price, user_id
#### https://www.kaggle.com/datasets/mkechinov/ecommerce-purchase-history-from-electronics-store


***Exploração de dados com RDD's***

In [None]:
# Instalar o PySpark:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Importar livrarias e criar o SparkContext:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Data Exploration")
sc = SparkContext(conf=conf)

In [None]:
# Carregar o dataset como um RDD:
dataset_rdd = sc.textFile("kz_dataset.csv")

In [None]:
# Analisar os dados CSV e dividir em colunas individuais:
header = dataset_rdd.first()  # Extract the header
data_rdd = dataset_rdd.filter(lambda line: line != header)  # Remove the header from RDD

# Dividir cada linha em colunas
columns_rdd = data_rdd.map(lambda line: line.split(","))


##### Realizar operações de exploração e análise de dados no RDD:

In [None]:
# Contar o número total de linha no dataset:
total_rows = columns_rdd.count()
print("Total linhas:", total_rows)

Total linhas: 2633521


In [None]:
# Extrair a coluna product_id:
product_id_rdd = columns_rdd.map(lambda row: row[2])

In [None]:
# Calculate the average price:
price_rdd = columns_rdd.map(lambda row: float(row[6]))
# average_price = price_rdd.mean()
# print("Average price:", average_price)

In [None]:
# Group data by a specific column and count the occurrences:
# Example: Group data by 'category_id' and count the occurrences
category_count_rdd = columns_rdd.map(lambda row: (row[3], 1)).reduceByKey(lambda a, b: a + b)
category_count = category_count_rdd.collect()
print("Category count:", category_count)

Category count: [('2268105430162997728', 66146), ('2268105402774193030', 13645), ('2268105442242593506', 27564), ('2268105409938063394', 2243), ('2268105428594328000', 13271), ('2268105390484882084', 34887), ('2268105654591816244', 1781), ('2268105402556089218', 10769), ('2268105440640369344', 4177), ('2309018576985522845', 61), ('2268105444734010134', 12302), ('2268105428946649544', 158), ('2268105419375247608', 24940), ('2268105408679772166', 3640), ('2268105440824918724', 3906), ('2268105439726011052', 5390), ('2268105439516295848', 8231), ('2268105440137052852', 7808), ('2268105406750393304', 11257), ('2268105419752734976', 368), ('2268105658173751944', 1936), ('2268105442049655518', 22762), ('2268105443970646790', 906), ('2268105404972008370', 610), ('2268105390174503582', 7547), ('2309018576893248155', 357), ('2268105442427142886', 16085), ('2268105404258976676', 1002), ('2268105445203772192', 3487), ('2268105419173921012', 507), ('2268105426723668374', 884), ('226810539071976311

In [None]:
# Apply filtering based on a condition:
# Example: Filter data where price is greater than 100
filtered_rdd = columns_rdd.filter(lambda row: float(row[6]) > 100)

In [None]:
# Find the maximum price:
max_price = price_rdd.max()
print("Maximum price:", max_price)

In [None]:
# Stop the SparkContext:
sc.stop()

*** Exploração de dados com SparkSQL***

In [12]:
# Import the necessary libraries and create a SparkSession:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data Analysis").getOrCreate()

In [14]:
# Read the CSV dataset into a DataFrame:
dataset_df = spark.read.csv("kz_dataset.csv", header=True, inferSchema=True)


#### Perform data exploration and analysis operations using Spark SQL:

In [15]:
# Display the schema of the DataFrame:
dataset_df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: long (nullable = true)



In [16]:
# Count the total number of rows in the dataset:
total_rows = dataset_df.count()
print("Total rows:", total_rows)

Total rows: 2633521


In [None]:
# Example: Extract the 'product_id' column
product_id_df = dataset_df.select("product_id")
product_id_df.show()


In [18]:
# Calculate the average price:
average_price = dataset_df.selectExpr("avg(price) as average_price").first()[0]
print("Average price:", average_price)

Average price: 154.09316532715627


In [None]:
# Group data by a specific column and count the occurrences:
# Example: Group data by 'category_id' and count the occurrences
category_count_df = dataset_df.groupBy("category_id").count()
category_count_df.show()

In [None]:
# Apply filtering based on a condition:
# Example: Filter data where price is greater than 100
filtered_df = dataset_df.filter(dataset_df.price > 100)
filtered_df.show()

In [21]:
# Find the maximum price:
max_price = dataset_df.agg({"price": "max"}).first()[0]
print("Maximum price:", max_price)

Maximum price: 50925.9


In [22]:
# Stop the SparkContext:
spark.stop()