In [2]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.types import FloatType
from pyspark.sql import DataFrame
from typing import Optional
import requests

In [None]:


def preprocess_data(data: DataFrame) -> DataFrame:
    indexer_customer_type = StringIndexer(inputCol="Customer type", outputCol="Customer type Index")
    data = indexer_customer_type.fit(data).transform(data)

    indexer_product_line = StringIndexer(inputCol="Product line", outputCol="Product line Index")
    data = indexer_product_line.fit(data).transform(data)

    numeric_columns = ["Unit price", "Quantity", "Tax 5%", "Total", "Rating"]
    for col_name in numeric_columns:
        data = data.withColumn(col_name, col(col_name).cast(FloatType()))

    data = data.dropna()

    return data

def read_and_preprocess_data(file_path: str) -> DataFrame:
    spark = SparkSession.builder.appName("RestaurantRecommendationALS").getOrCreate()
    df = spark.read.format("csv").option("header", "true").load(file_path)
    preprocessed_data = preprocess_data(df)
    return preprocessed_data

def train_and_evaluate_model(data: DataFrame) -> ALS:
    required_columns = ["Customer type Index", "Product line Index", "Rating"]
    missing_columns = set(required_columns) - set(data.columns)

    if missing_columns:
        raise ValueError(f"Erro Valor nao encontrado: {missing_columns}")

    if data.count() == 0:
        print("Erro Valor nao encontrado ")
        return None

    (train_data, test_data) = data.randomSplit([0.8, 0.2])

    als = ALS(maxIter=5, regParam=0.01, userCol="Customer type Index", itemCol="Product line Index", ratingCol="Rating")
    model = als.fit(train_data)

    predictions = model.transform(test_data)

    evaluator = RegressionEvaluator(metricName="mae", labelCol="Rating", predictionCol="prediction")
    mae = evaluator.evaluate(predictions)

    print("Mean Absolute Error (MAE):", mae)

    return model

def flatten_recommendations(recommendations: DataFrame) -> DataFrame:
    return recommendations.select("Customer type Index", explode("recommendations").alias("recommendation"))

def get_recommendations_by_customer_id(spark: SparkSession, model: ALS, customer_id: Optional[int] = None, num_recommendations: int = 5) -> DataFrame:
    if customer_id is not None:
        customer_id_df = spark.createDataFrame([(customer_id,)], ["Customer type Index"])
        user_recs = model.recommendForUserSubset(customer_id_df, num_recommendations)
    else:
        user_recs = model.recommendForAllUsers(num_recommendations)

    user_recs_flattened = flatten_recommendations(user_recs)
    return user_recs_flattened

if __name__ == "__main__":
    file_path = "supermarket_sales.csv"
    data = read_and_preprocess_data(file_path)
    model = train_and_evaluate_model(data)

    spark = SparkSession.builder.master("local[*]").appName("RestaurantRecommendationALS").getOrCreate()
    sc = spark.sparkContext
    
    user_id = 3
    user_recs = get_recommendations_by_customer_id(spark, model, customer_id=user_id, num_recommendations=5)

    user_recs_pandas = user_recs.toPandas()

    print(f"Top 5 Recommendations for Customer {user_id}:")
    print(user_recs_pandas)

    all_user_recs = get_recommendations_by_customer_id(spark, model, num_recommendations=100)

    all_user_recs_pandas = all_user_recs.toPandas()

    print("Top 100 Recommendations for All Customers:")
    print(all_user_recs_pandas)

    spark.stop()


In [None]:

#Apache Livy Teste
def run_spark_job_using_livy(livy_url: str, spark_code: str):
    headers = {'Content-Type': 'application/json'}
    data = {
        'code': spark_code,
        'kind': 'spark',
        'conf': {
            'spark.jars.packages': 'org.apache.spark:spark-avro_2.12:3.1.1'
        }
    }

    response = requests.post(f'{livy_url}/batches', json=data, headers=headers)

    if response.status_code == 201:
        batch_id = response.json()['id']
        print(f"Feito ! Batch ID: {batch_id}")
        return batch_id
    else:
        print("Erro")
        print(response.json())
        return None

if __name__ == "__main__":
    #exemplo de pyfile

    livy_url = "http:127.0.0.1:8998"  
    spark_code = """
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("RecommendationALS").getOrCreate()

    # spark.sparkContext.setLogLevel("ERROR")
    # ... (teste sql server)

    spark.stop()
    """
    batch_id = run_spark_job_using_livy(livy_url, spark_code)
