In [45]:
import os
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace,when, lit,udf
from pyspark.sql.types import FloatType
from pyspark import SparkConf,SparkContext
import pyspark.sql.functions as F 
from typing import Callable
from pyspark.sql import DataFrame
from pyspark.ml.feature import StringIndexer, OneHotEncoder,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import countDistinct
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [23]:
findspark.init("D:\spark")

In [24]:
os.environ["SPARK_HOME"]=r"C:\spark"
os.environ["HADOOP_HOME"]=r"C:\spark\hadoop\bin"
os.environ["JAVA_HOME"]=r"C:\Program Files\Java\jdk-20"

In [46]:


conf = SparkConf()
conf.set("spark.app.name","SparkApp")
conf.set("spark.master","local[*]")
spark = SparkSession.builder\
    .config(conf=conf)\
    .appName("teste")\
    .getOrCreate()


sc = spark.sparkContext

file_path = r"C:\spyderteste\dataseths\data_superstore.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True, sep=";")


In [26]:

data = df.select(
"OrderID", "ShipDate", "CustomerID", "City", "Sub-Category", "ProductName","ShipMode",
    F.regexp_replace("Sales", ",", ".").cast(FloatType()).alias("Sales"),
    "Quantity",
    F.regexp_replace("Discount", ",", ".").cast(FloatType()).alias("Discount")
).where("ShipMode = 'Second Class'")
valores = df.groupBy("Category").agg(
    F.count("OrderID").alias("CountOrderID")
)


In [27]:

columns_to_index = ['OrderID', 'ShipDate', 'CustomerID', 'City', 'Sub-Category', 'ProductName', 'ShipMode', 'Sales', 'Quantity', 'Discount']


indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index").fit(df) for col in columns_to_index]


for indexer in indexers:
    df = indexer.transform(df)

df.show()


+-----+--------------+----------+----------+--------------+----------+------------------+-----------+-------------+---------------+--------------+----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+-------------+--------------+----------------+----------+------------------+-----------------+--------------+-----------+--------------+--------------+
|RowID|       OrderID| OrderDate|  ShipDate|      ShipMode|CustomerID|      CustomerName|    Segment|      Country|           City|         State|PostalCode| Region|      ProductID|       Category|Sub-Category|         ProductName|   Sales|Quantity|Discount|  Profit|OrderID_index|ShipDate_index|CustomerID_index|City_index|Sub-Category_index|ProductName_index|ShipMode_index|Sales_index|Quantity_index|Discount_index|
+-----+--------------+----------+----------+--------------+----------+------------------+-----------+-------------+---------------+--------------+----------+-----

In [28]:

all_columns = df.columns

indexed_columns = [col_name for col_name in all_columns if "_index" in col_name]

indexed_columns_encod = [col_name + "_encoded" for col_name in indexed_columns]

onehot = OneHotEncoder(inputCols=indexed_columns, outputCols=indexed_columns_encod)

modelo = onehot.fit(df)

one = modelo.transform(df)

one.show()


+-----+--------------+----------+----------+--------------+----------+------------------+-----------+-------------+---------------+--------------+----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+-------------+--------------+----------------+----------+------------------+-----------------+--------------+-----------+--------------+--------------+---------------------+----------------------+------------------------+------------------+--------------------------+-------------------------+----------------------+-------------------+----------------------+----------------------+
|RowID|       OrderID| OrderDate|  ShipDate|      ShipMode|CustomerID|      CustomerName|    Segment|      Country|           City|         State|PostalCode| Region|      ProductID|       Category|Sub-Category|         ProductName|   Sales|Quantity|Discount|  Profit|OrderID_index|ShipDate_index|CustomerID_index|City_index|Sub-Category_index|Product

In [29]:

all_columns = one.columns

encoded_columns = [col_name for col_name in all_columns if "_encoded" in col_name]

print(encoded_columns)


['OrderID_index_encoded', 'ShipDate_index_encoded', 'CustomerID_index_encoded', 'City_index_encoded', 'Sub-Category_index_encoded', 'ProductName_index_encoded', 'ShipMode_index_encoded', 'Sales_index_encoded', 'Quantity_index_encoded', 'Discount_index_encoded']


In [30]:


def add_reference_ids(df, columns_to_reference):

    temp_df = df.select(columns_to_reference).distinct()
    for column in columns_to_reference:
        temp_df = temp_df.withColumn(column + "REF", monotonically_increasing_id())

    result_df = df.join(temp_df, on=columns_to_reference, how="left")

    return result_df


columns_to_reference = ["CustomerID", "OrderID", "ProductID"]


df_with_refs = add_reference_ids(df, columns_to_reference)


df_with_refs.show()


+----------+--------------+---------------+-----+----------+----------+--------------+------------------+-----------+-------------+---------------+--------------+----------+-------+---------------+------------+--------------------+--------+--------+--------+--------+-------------+--------------+----------------+----------+------------------+-----------------+--------------+-----------+--------------+--------------+-------------+----------+------------+
|CustomerID|       OrderID|      ProductID|RowID| OrderDate|  ShipDate|      ShipMode|      CustomerName|    Segment|      Country|           City|         State|PostalCode| Region|       Category|Sub-Category|         ProductName|   Sales|Quantity|Discount|  Profit|OrderID_index|ShipDate_index|CustomerID_index|City_index|Sub-Category_index|ProductName_index|ShipMode_index|Sales_index|Quantity_index|Discount_index|CustomerIDREF|OrderIDREF|ProductIDREF|
+----------+--------------+---------------+-----+----------+----------+--------------+

In [31]:
df_with_refs.show(4)

df = df_with_refs.select("CustomerIDREF","OrderIDREF","ProductIDREF","Quantity")
df.dtypes

+----------+--------------+---------------+-----+----------+----------+--------------+---------------+---------+-------------+---------------+----------+----------+------+---------------+------------+--------------------+--------+--------+--------+--------+-------------+--------------+----------------+----------+------------------+-----------------+--------------+-----------+--------------+--------------+-------------+----------+------------+
|CustomerID|       OrderID|      ProductID|RowID| OrderDate|  ShipDate|      ShipMode|   CustomerName|  Segment|      Country|           City|     State|PostalCode|Region|       Category|Sub-Category|         ProductName|   Sales|Quantity|Discount|  Profit|OrderID_index|ShipDate_index|CustomerID_index|City_index|Sub-Category_index|ProductName_index|ShipMode_index|Sales_index|Quantity_index|Discount_index|CustomerIDREF|OrderIDREF|ProductIDREF|
+----------+--------------+---------------+-----+----------+----------+--------------+---------------+----

[('CustomerIDREF', 'bigint'),
 ('OrderIDREF', 'bigint'),
 ('ProductIDREF', 'bigint'),
 ('Quantity', 'int')]

In [32]:
df_with_refs.dtypes

[('CustomerID', 'string'),
 ('OrderID', 'string'),
 ('ProductID', 'string'),
 ('RowID', 'int'),
 ('OrderDate', 'string'),
 ('ShipDate', 'string'),
 ('ShipMode', 'string'),
 ('CustomerName', 'string'),
 ('Segment', 'string'),
 ('Country', 'string'),
 ('City', 'string'),
 ('State', 'string'),
 ('PostalCode', 'int'),
 ('Region', 'string'),
 ('Category', 'string'),
 ('Sub-Category', 'string'),
 ('ProductName', 'string'),
 ('Sales', 'string'),
 ('Quantity', 'int'),
 ('Discount', 'string'),
 ('Profit', 'string'),
 ('OrderID_index', 'double'),
 ('ShipDate_index', 'double'),
 ('CustomerID_index', 'double'),
 ('City_index', 'double'),
 ('Sub-Category_index', 'double'),
 ('ProductName_index', 'double'),
 ('ShipMode_index', 'double'),
 ('Sales_index', 'double'),
 ('Quantity_index', 'double'),
 ('Discount_index', 'double'),
 ('CustomerIDREF', 'bigint'),
 ('OrderIDREF', 'bigint'),
 ('ProductIDREF', 'bigint')]

In [33]:


df = df.select([
    col_name if col_type == "string" else col(col_name).cast("string")
    for col_name, col_type in df.dtypes
])


user_indexer = StringIndexer(inputCol="CustomerIDREF", outputCol="userIndex")
item_indexer = StringIndexer(inputCol="ProductIDREF", outputCol="itemIndex")
quantity_indexer = StringIndexer(inputCol="Quantity", outputCol="rating")

user_indexer_model = user_indexer.fit(df)
item_indexer_model = item_indexer.fit(df)
quantity_indexer_model = quantity_indexer.fit(df)

user_df = user_indexer_model.transform(df)
item_df = item_indexer_model.transform(user_df)
indexed_df = quantity_indexer_model.transform(item_df)


(trainingData, testData) = indexed_df.randomSplit([0.8, 0.2])


als = ALS(maxIter=5, regParam=0.01, userCol="userIndex", itemCol="itemIndex", ratingCol="rating",
          coldStartStrategy="drop")


model = als.fit(trainingData)

predictions = model.transform(testData)


evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Erro Quadrático Médio da Raiz (RMSE):", rmse)


user_index = 0
user_recommendations = model.recommendForUserSubset(user_df.where(col("userIndex") == user_index), 10)


user_recommendations = user_recommendations.select("userIndex", "recommendations.itemIndex", "recommendations.rating")

user_recommendations.show(truncate=False)

all_user_recommendations = model.recommendForAllUsers(10)


all_user_recommendations = all_user_recommendations.select("userIndex", "recommendations.itemIndex", "recommendations.rating")

all_user_recommendations.show(truncate=False)


Erro Quadrático Médio da Raiz (RMSE): 4.360724578512602
+---------+---------+------+
|userIndex|itemIndex|rating|
+---------+---------+------+
+---------+---------+------+

+---------+------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------+
|userIndex|itemIndex                                                   |rating                                                                                                     |
+---------+------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------+
|1        |[3827, 4034, 3238, 7889, 7104, 1393, 7014, 5899, 7421, 45]  |[7.40916, 6.263707, 5.967018, 5.618666, 5.1259212, 5.041609, 4.866453, 4.7286024, 4.6685376, 4.598825]     |
|3        |[148, 138, 128, 124, 120, 119, 118, 117, 114, 113]          |[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 