<a href="https://colab.research.google.com/github/Gangaraju1411/Databricks/blob/main/Recommendations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=126f46f3b18128af234e22aad4ef298a0a6b2b9fa7f4c0132574403e77f59d68
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
import pandas as pd
import random
from faker import Faker

# Initialize Faker for generating fake customer names and details
fake = Faker()

# Define some product names and categories
products = [
    {'product_id': 1, 'product_name': 'Smartphone', 'category': 'Electronics'},
    {'product_id': 2, 'product_name': 'Laptop', 'category': 'Electronics'},
    {'product_id': 3, 'product_name': 'Blender', 'category': 'Home Appliances'},
    {'product_id': 4, 'product_name': 'Air Conditioner', 'category': 'Home Appliances'},
    {'product_id': 5, 'product_name': 'Washing Machine', 'category': 'Home Appliances'},
    {'product_id': 6, 'product_name': 'Shoes', 'category': 'Fashion'},
    {'product_id': 7, 'product_name': 'T-shirt', 'category': 'Fashion'},
    {'product_id': 8, 'product_name': 'Watch', 'category': 'Fashion'},
    {'product_id': 9, 'product_name': 'Headphones', 'category': 'Electronics'},
    {'product_id': 10, 'product_name': 'Tablet', 'category': 'Electronics'}
]

# Generate 200 customer purchase records
data = []
for i in range(200):
    customer_id = random.randint(1, 50)
    customer_name = fake.name()
    age = random.randint(18, 65)
    gender = random.choice(['Male', 'Female'])
    product = random.choice(products)
    product_id = product['product_id']
    product_name = product['product_name']
    purchase_amount = round(random.uniform(50, 1000), 2)
    purchase_date = fake.date_this_year()
    rating = random.randint(1, 5)
    category = product['category']

    data.append({
        'customer_id': customer_id,
        'customer_name': customer_name,
        'age': age,
        'gender': gender,
        'product_id': product_id,
        'product_name': product_name,
        'purchase_amount': purchase_amount,
        'purchase_date': purchase_date,
        'rating': rating,
        'category': category
    })

# Convert the list to a pandas DataFrame
df = pd.DataFrame(data)

# Save the dataset as a CSV for further use
df.to_csv('customer_purchase_data.csv', index=False)

# Display the first few rows of the DataFrame
df.head()


In [None]:
display(df)

In [None]:
from pyspark.ml.recommendation import ALSModel

# Load the ALS model
model_path = "models/als_model"
model = ALSModel.load(model_path)


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import pandas as pd

# Initialize Spark session
spark = SparkSession.builder.appName("ProductRecommendation").getOrCreate()

# Load the dataset
df = pd.read_csv('customer_purchase_data.csv')
spark_df = spark.createDataFrame(df)

# Show the dataset
spark_df.show(5)

# Prepare data for ALS (Alternating Least Squares)
# We will use customer_id, product_id, and rating columns
als_data = spark_df.select('customer_id', 'product_id', 'rating')

# Split the data into training and testing sets
(training, test) = als_data.randomSplit([0.8, 0.2])

# Create ALS model and fit to the training data
als = ALS(userCol="customer_id", itemCol="product_id", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)
model = als.fit(training)

# Make predictions on the test data
predictions = model.transform(test)

# Evaluate the model by calculating RMSE (Root Mean Squared Error)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Generate top 5 product recommendations for each customer
customer_recommendations = model.recommendForAllUsers(5)

# Show the recommendations
customer_recommendations.show(truncate=False)


In [None]:
from pyspark.ml.recommendation import ALS

# Train the ALS model
als = ALS(userCol="customer_id", itemCol="product_id", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)
model = als.fit(training)

# Save the trained ALS model
model_path = "models/als_model"
model.save(model_path)


In [None]:
%pip install pyngrok

Collecting pyngrok
  Downloading pyngrok-7.2.0-py3-none-any.whl.metadata (7.4 kB)
Downloading pyngrok-7.2.0-py3-none-any.whl (22 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.0


In [None]:
from flask import Flask, request, jsonify
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALSModel

# Initialize Flask app
app = Flask(__name__)

# Initialize Spark session
spark = SparkSession.builder.appName("ProductRecommendation").getOrCreate()

# Load the ALS model
model_path = "models/als_model"  # Path where you saved the model
model = ALSModel.load(model_path)

@app.route('/recommendations', methods=['GET'])
def get_recommendations():
    # Get customer_id from request args
    customer_id = request.args.get('customer_id')

    if not customer_id:
        return jsonify({"error": "customer_id parameter is required"}), 400

    try:
        # Create a DataFrame with the customer_id to get recommendations
        user_df = spark.createDataFrame([(int(customer_id),)], ["customer_id"])

        # Generate top 5 recommendations for the specified customer
        recommendations = model.recommendForUserSubset(user_df, 5)

        # Convert recommendations to a list of dictionaries
        recommendations_list = []
        for row in recommendations.collect():
            customer_id = row['customer_id']
            products = row['recommendations']
            recommendations_list.append({
                'customer_id': customer_id,
                'products': [item['product_id'] for item in products]
            })

        return jsonify(recommendations_list)

    except Exception as e:
        return jsonify({"error": str(e)}), 500

# Run the Flask app with ngrok tunnel
if __name__ == '__main__':
    from pyngrok import ngrok


    # Set r ngrok auth token
    ngrok.set_auth_token('2lgUBseRGyAgeoEWcaJMlfOqjWJ_SnxETCPK9ZpzRZeiLazC')

    # Start ngrok and open a tunnel to the Flask app
    public_url = ngrok.connect(5000)
    print(f" * Ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:5000\"")

    # Start the Flask app
    app.run(port=5000)




 * Ngrok tunnel "NgrokTunnel: "https://b0f8-34-66-19-152.ngrok-free.app" -> "http://localhost:5000"" -> "http://127.0.0.1:5000"
 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:35:52] "[35m[1mGET /recommendations?customer_id=1 HTTP/1.1[0m" 500 -


In [16]:
from flask import Flask, request, jsonify
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALSModel

# Initialize Flask app
app = Flask(__name__)

# Initialize Spark session
spark = SparkSession.builder.appName("ProductRecommendation").getOrCreate()

# Load the ALS model
model_path = "models/als_model"  # Path where you saved the model
model = ALSModel.load(model_path)

# Load the customer purchase data
purchase_data_path = "customer_purchase_data.csv"  # Path to your purchase data
purchase_data_df = spark.read.csv(purchase_data_path, header=True, inferSchema=True)

# Register purchase data DataFrame as a temporary view
purchase_data_df.createOrReplaceTempView("purchase_data")

@app.route('/recommendations', methods=['GET'])
def get_recommendations():
    # Get customer_id from request args
    customer_id = request.args.get('customer_id')

    if not customer_id:
        return jsonify({"error": "customer_id parameter is required"}), 400

    try:
        # Create a DataFrame with the customer_id to get recommendations
        user_df = spark.createDataFrame([(int(customer_id),)], ["customer_id"])

        # Generate top 5 recommendations for the specified customer
        recommendations = model.recommendForUserSubset(user_df, 5)

        # Explode recommendations to get individual rows
        recommendations_exploded = recommendations.selectExpr("customer_id", "explode(recommendations) as rec") \
            .selectExpr("customer_id", "rec.product_id as product_id")

        # Get product names from the purchase data DataFrame
        product_names_df = spark.sql("SELECT DISTINCT product_id, product_name FROM purchase_data")

        # Join recommendations with product names
        recommendations_with_names = recommendations_exploded.join(
            product_names_df,
            on="product_id",
            how="left"
        ).select("customer_id", "product_name")

        # Convert to list of dictionaries
        recommendations_list = recommendations_with_names \
            .groupBy("customer_id") \
            .agg({"product_name": "collect_list"}) \
            .withColumnRenamed("collect_list(product_name)", "products") \
            .toPandas() \
            .to_dict(orient='records')

        return jsonify(recommendations_list)

    except Exception as e:
        return jsonify({"error": str(e)}), 500

# Run the Flask app with ngrok tunnel
if __name__ == '__main__':
    from pyngrok import ngrok

    # Set ngrok auth token
    ngrok.set_auth_token('2lgUBseRGyAgeoEWcaJMlfOqjWJ_SnxETCPK9ZpzRZeiLazC')

    # Start ngrok and open a tunnel to the Flask app
    public_url = ngrok.connect(5000)
    print(f" * Ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:5000\"")

    # Start the Flask app
    app.run(port=5000)




 * Ngrok tunnel "NgrokTunnel: "https://d302-34-66-19-152.ngrok-free.app" -> "http://localhost:5000"" -> "http://127.0.0.1:5000"
 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:45:22] "GET /recommendations?customer_id=1 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:45:44] "GET /recommendations?customer_id=3 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:45:52] "GET /recommendations?customer_id=8 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:46:06] "GET /recommendations?customer_id=10 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:46:19] "GET /recommendations?customer_id=20 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:46:29] "GET /recommendations?customer_id=100 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:46:43] "GET /recommendations?customer_id=50 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:47:05] "GET /recommendations?customer_id=25 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [11/Sep/2024 09:47:21] "GET /recommendations?customer_id=24 HT