In [17]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from pyspark.sql import SparkSession
import findspark
import os
import plotly.express as px
from transformers import TFAutoModel
from pyspark.ml.feature import VectorAssembler
from tensorflow.keras.layers import LSTM, Dense
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.types import StringType
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.fpm import FPGrowth


In [10]:
file_path=r"D:\estudos_0012708\dados_super_store.csv"

In [12]:
os.environ["SPARK_HOME"]=r"D:\estudos_0012708\venv\Lib\site-packages\pyspark"
os.environ["HADOOP_HOME"]=r"D:\hadoop-3.3.6"
os.environ["JAVA_HOME"]=r"C:\Program Files\Java\jdk-20"

In [8]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("analise_nlp") \
    .getOrCreate()

In [None]:
regressor = Sequential()

In [None]:
def generate_line_plot(data, x,y, name):
    fig = px.line(data, x=x, y=y, title=name)


    fig.update_layout(
        paper_bgcolor='rgba(0,0,0,0)',  
        plot_bgcolor='rgba(0,0,0,0)',   
        font_color='white',            
    )
    fig.update_traces(line=dict(color='rgb(148, 0, 211)'))  

    fig.update_layout(hovermode='x')

    fig.show()


In [None]:

def replace_null_with_zero(func):
    def wrapper(*args, **kwargs):
        result_df = func(*args, **kwargs)
        columns = result_df.columns
       
        result_df = result_df.select(
            *[F.when(F.col(col).isNull(), 0).otherwise(F.col(col)).alias(col) for col in columns]
        )
        return result_df
    return wrapper



def select_best_features(func):
    def wrapper(*args, **kwargs):
        df = func(*args, **kwargs)
       
        feature_cols = ['valor', 'order', 'clienteid'] 
  
        vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
       
        df = vector_assembler.transform(df)
    
        selector = ChiSqSelector(numTopFeatures=5, featuresCol="features", outputCol="selectedFeatures", labelCol="target")
        model = selector.fit(df)
        df = model.transform(df)
        return df
    return wrapper



def convert_categorical_to_onehot(func):
    def wrapper(*args, **kwargs):
        result_df = func(*args, **kwargs)
        columns = result_df.columns
      
        for col in columns:
            data_type = result_df.schema[col].dataType
            if isinstance(data_type, StringType): 
                encoder = OneHotEncoder(inputCol=col, outputCol=f"{col}_onehot")
                result_df = encoder.transform(result_df)
        return result_df
    return wrapper



In [None]:

def optimize_fpgrowth(func):
    def wrapper(*args, **kwargs):
        df = func(*args, **kwargs)
        
      
        train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)
        
        fp_growth = FPGrowth(itemsCol="items", minSupport=0.1, minConfidence=0.2)
        
     
        param_grid = ParamGridBuilder() \
            .addGrid(fp_growth.minSupport, [0.05, 0.1, 0.2]) \
            .addGrid(fp_growth.minConfidence, [0.1, 0.2, 0.3]) \
            .build()
        
        
        crossval = CrossValidator(estimator=fp_growth,
                                  estimatorParamMaps=param_grid,
                                  evaluator=None, 
                                  numFolds=5)
        
      
        model = crossval.fit(train_data)
        
        return model
    return wrapper




In [None]:



@optimize_fpgrowth
def read_data_and_optimize_fpgrowth(file_path):
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    df = df.groupBy("transaction_id").agg(expr("collect_list(item_id) as items"))
    return df



optimized_fpgrowth_model = read_data_and_optimize_fpgrowth(file_path)




def optimize_als(func):
    def wrapper(*args, **kwargs):
        df = func(*args, **kwargs)
        
      
        train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)
        
     
        als = ALS(userCol="user", itemCol="item", ratingCol="rating", coldStartStrategy="drop")
        
      
        param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 20, 30]) \
            .addGrid(als.maxIter, [10, 20, 30]) \
            .addGrid(als.regParam, [0.01, 0.1, 0.2]) \
            .build()
        
     
        evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
        
       
        crossval = CrossValidator(estimator=als,
                                  estimatorParamMaps=param_grid,
                                  evaluator=evaluator,
                                  numFolds=5)
        
        model = crossval.fit(train_data)
        
 
        predictions = model.transform(test_data)
        rmse = evaluator.evaluate(predictions)
        print(f"RMSE no conjunto de teste: {rmse:.2f}")
        
        return model
    return wrapper



In [None]:


@optimize_als
def read_data_and_optimize_als(file_path):
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    df = df.selectExpr("userId as user", "movieId as item", "rating")
    return df



optimized_als_model = read_data_and_optimize_als(file_path)



@convert_categorical_to_onehot
def read_csv_and_convert_to_onehot(file_path):
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df


df_with_onehot = read_csv_and_convert_to_onehot(file_path)


@replace_null_with_zero
def read_csv_with_null_replacement(file_path):
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df


df_with_replacement = read_csv_with_null_replacement(file_path)



@select_best_features
def read_csv_and_select_features(file_path):
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df


df_selected_features = read_csv_and_select_features(file_path)

In [None]:
def reader_csv_files(csvfiles):
    file_path=csvfiles

    df = spark.read.csv(file_path, header=True, inferSchema=True,sep=";")
    df.columns
    
file_path=r"D:\estudos_0012708\dados_super_store.csv"  
reader_csv_files(file_path)

In [None]:

def calculate_sales_and_profit(df):
    total_sales = df.groupBy("Category").sum("Sales").alias("TotalSales")
    avg_profit = df.groupBy("Category").avg("Profit").alias("AvgProfit")

    result = total_sales.join(avg_profit, "Category").select("Category", "sum(Sales)", "avg(Profit)")

    result = result.withColumnRenamed("sum(Sales)", "TotalSales").withColumnRenamed("avg(Profit)", "AvgProfit")

    result.show()
    return result




In [None]:

def train_lstm_model(data, feature_cols, label_col):

    vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    data = vector_assembler.transform(data)

    train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

  
    lstm = LSTM(featuresCol="features", labelCol=label_col, predictionCol="Prediction",
                maxIter=10, stepSize=0.1, inputSize=10, outputSize=1, blockSize=128)

   
    pipeline = Pipeline(stages=[lstm])
    model = pipeline.fit(train_data)

 
    predictions = model.transform(test_data)
    evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="Prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")

    return model


file_path = r"D:\estudos_0012708\dados_super_store.csv"


data = spark.read.csv(file_path, header=True, inferSchema=True)


feature_cols = ['Quantity', 'Discount', 'Profit'] 
label_col = "Sales" 


trained_model = train_lstm_model(data, feature_cols, label_col)


spark.stop()


In [None]:
file_path=r"D:\estudos_0012708\dados_super_store.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True,sep=";")
df.columns
