In [1]:
from pyspark.sql import SparkSession
import random
import string

# Initialize Spark Session
spark = SparkSession.builder.appName("VirtualHandshake").getOrCreate()

print("1. Spark-RDDs: Scalable Virtual Handshake Applications")
print("======================================================")

# Helper function to generate random attributes
def generate_attributes():
    name = ''.join(random.choices(string.ascii_letters, k=random.randint(5, 10)))
    age = random.randint(18, 80)
    email = f"{name.lower()}@example.com"
    return f"{name},{age},{email}"

# Create PEOPLE dataset
def create_people(num_people=100000):
    return [(i, random.randint(1, 10000), random.randint(1, 10000), generate_attributes())
            for i in range(num_people)]

# Create ACTIVATED dataset (subset of PEOPLE)
def create_activated(people, activation_rate=0.1):
    return random.sample(people, int(len(people) * activation_rate))

# Create PEOPLE_WITH_HANDSHAKE_INFO dataset
def create_people_with_handshake_info(people, activated):
    activated_ids = set(a[0] for a in activated)
    return [(p[0], p[1], p[2], p[3], "yes" if p[0] in activated_ids else "no") for p in people]

# Generate datasets
people = create_people(10000)  # Reduced size for demonstration
activated = create_activated(people)
people_with_handshake_info = create_people_with_handshake_info(people, activated)

# Create RDDs
people_rdd = spark.sparkContext.parallelize(people)
activated_rdd = spark.sparkContext.parallelize(activated)
people_with_handshake_info_rdd = spark.sparkContext.parallelize(people_with_handshake_info)

print("Datasets created successfully.")

# Helper function for distance calculation
def distance(p1, p2):
    return ((p1[1] - p2[1])**2 + (p1[2] - p2[2])**2)**0.5

# Query 1
def query1(people_rdd, activated_rdd):
    activated_list = activated_rdd.collect()
    def find_nearby(person):
        return [(person[0], act[0]) for act in activated_list if distance(person, act) <= 6 and person[0] != act[0]]
    return people_rdd.flatMap(find_nearby).distinct()

result_query1 = query1(people_rdd, activated_rdd)
print("\nQuery 1 Result (sample):")
for pair in result_query1.take(5):
    print(pair)

# Query 2
def query2(people_rdd, activated_rdd):
    activated_list = activated_rdd.collect()
    def find_nearby_ids(person):
        return [person[0] for act in activated_list if distance(person, act) <= 6 and person[0] != act[0]]
    return people_rdd.flatMap(find_nearby_ids).distinct()

result_query2 = query2(people_rdd, activated_rdd)
print("\nQuery 2 Result (sample):")
for id in result_query2.take(5):
    print(id)

# Query 3
def query3(people_with_handshake_info_rdd):
    people_list = people_with_handshake_info_rdd.collect()
    def count_nearby(person):
        return (person[0], sum(1 for p in people_list if distance(person, p) <= 6 and person[0] != p[0]))
    return people_with_handshake_info_rdd.filter(lambda p: p[4] == "yes").map(count_nearby)

result_query3 = query3(people_with_handshake_info_rdd)
print("\nQuery 3 Result (sample):")
for pair in result_query3.take(5):
    print(pair)

spark.stop()

1. Spark-RDDs: Scalable Virtual Handshake Applications
Datasets created successfully.

Query 1 Result (sample):
(147, 5501)
(1245, 4577)
(3275, 3579)
(3381, 7795)
(3552, 922)

Query 2 Result (sample):
1296
3552
3750
5286
6694

Query 3 Result (sample):
(53, 0)
(58, 0)
(61, 0)
(62, 0)
(70, 0)


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when, mean

# Initialize Spark Session
spark = SparkSession.builder.appName("ECommerceAnalysis").getOrCreate()

# Load the dataset
df = spark.read.csv("/content/ecommerce_customer_data_custom_ratios.csv", header=True, inferSchema=True)

# Choose a target attribute
target_column = "Product Price"

# Define numeric and categorical features
numeric_features = ["Quantity", "Total Purchase Amount", "Customer Age", "Returns", "Churn"]
categorical_features = ["Gender", "Product Category"]

# Handle null values
for column in df.columns:
    if df.schema[column].dataType.simpleString() in ['integer', 'double', 'float']:
        df = df.withColumn(column, col(column).cast("double"))
        mean_value = df.select(mean(col(column))).collect()[0][0]
        df = df.fillna(mean_value, subset=[column])
    else:
        df = df.fillna("Unknown", subset=[column])

# Split the dataset
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Create stages for the pipeline
stages = []

# Handle categorical features
for categoricalCol in categorical_features:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index", handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

# Assemble features
assembler_inputs = [c + "classVec" for c in categorical_features] + numeric_features
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="keep")
stages += [assembler]

# Define models
lr = LinearRegression(featuresCol="features", labelCol=target_column)
rf = RandomForestRegressor(featuresCol="features", labelCol=target_column)
gbt = GBTRegressor(featuresCol="features", labelCol=target_column)

# Create pipelines
lr_pipeline = Pipeline(stages=stages + [lr])
rf_pipeline = Pipeline(stages=stages + [rf])
gbt_pipeline = Pipeline(stages=stages + [gbt])

# Train models
lr_model = lr_pipeline.fit(train_df)
rf_model = rf_pipeline.fit(train_df)
gbt_model = gbt_pipeline.fit(train_df)

# Evaluate models
def evaluate_model(model, test_data):
    predictions = model.transform(test_data)
    evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction")
    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
    mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
    return rmse, mae, r2

# Evaluate models
lr_metrics = evaluate_model(lr_model, test_df)
rf_metrics = evaluate_model(rf_model, test_df)
gbt_metrics = evaluate_model(gbt_model, test_df)

# Print results
print("Model Evaluation Results:")
print("-------------------------")
print("Metrics: RMSE, MAE, R2")
print(f"Linear Regression: {lr_metrics}")
print(f"Random Forest: {rf_metrics}")
print(f"Gradient Boosted Trees: {gbt_metrics}")

# Analysis and discussion
print("\nAnalysis and Discussion:")
print("------------------------")
print("1. Model Performance:")
print("   - Comparing RMSE, MAE, and R2 values, we can see which model performs best.")
print("   - Lower RMSE and MAE, and higher R2 indicate better performance.")
print("2. Feature Importance:")
print("   - The Random Forest and Gradient Boosted Trees models can provide feature importance.")
print("   - This can help identify which features are most predictive of the Product Price.")
print("3. Model Complexity vs. Performance:")
print("   - Linear Regression is the simplest model but may not capture complex relationships.")
print("   - Random Forest and Gradient Boosted Trees are more complex and may perform better on non-linear data.")
print("4. Potential Improvements:")
print("   - Feature engineering: creating new features or transforming existing ones.")
print("   - Hyperparameter tuning: using techniques like cross-validation to optimize model parameters.")
print("   - Ensemble methods: combining predictions from multiple models for potentially better performance.")

spark.stop()

Model Evaluation Results:
-------------------------
Metrics: RMSE, MAE, R2
Linear Regression: (141.19868977764872, 122.09277418476282, -7.202114260773662e-05)
Random Forest: (141.2095145336862, 122.09446598796457, -0.0002253646421315203)
Gradient Boosted Trees: (141.2582487086586, 122.11182146462424, -0.0009158785858793816)

Analysis and Discussion:
------------------------
1. Model Performance:
   - Comparing RMSE, MAE, and R2 values, we can see which model performs best.
   - Lower RMSE and MAE, and higher R2 indicate better performance.
2. Feature Importance:
   - The Random Forest and Gradient Boosted Trees models can provide feature importance.
   - This can help identify which features are most predictive of the Product Price.
3. Model Complexity vs. Performance:
   - Linear Regression is the simplest model but may not capture complex relationships.
   - Random Forest and Gradient Boosted Trees are more complex and may perform better on non-linear data.
4. Potential Improvements:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, expr, col, sum, count, min, max, percentile_approx
import random
import string

# Initialize Spark Session
spark = SparkSession.builder.appName("PurchaseTransactions").getOrCreate()

# Step 1: Data Creation

def generate_random_string(length):
    return ''.join(random.choices(string.ascii_letters, k=length))

# Create Customers dataset
customers_data = [(i,
                   generate_random_string(random.randint(10, 20)),
                   random.randint(18, 100),
                   random.randint(1, 500),
                   random.uniform(100, 10000000))
                  for i in range(1, 50001)]

customers_df = spark.createDataFrame(customers_data, ["ID", "Name", "Age", "CountryCode", "Salary"])

# Create Purchases dataset
purchases_data = [(i,
                   random.randint(1, 50000),
                   random.uniform(10, 2000),
                   random.randint(1, 15),
                   generate_random_string(random.randint(20, 50)))
                  for i in range(1, 5000001)]

purchases_df = spark.createDataFrame(purchases_data, ["TransID", "CustID", "TransTotal", "TransNumItems", "TransDesc"])

# Task 2.0: Load data into storage
customers_df.createOrReplaceTempView("Customers")
purchases_df.createOrReplaceTempView("Purchases")

print("Data loaded successfully.")

# Task 2.1: Filter out purchases above $600
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW T1 AS
    SELECT * FROM Purchases
    WHERE TransTotal <= 600
""")

print("Task 2.1 completed: T1 created")

# Task 2.2: Group by Number of Items and calculate statistics
result_2_2 = spark.sql("""
    SELECT
        TransNumItems,
        percentile_approx(TransTotal, 0.5) AS MedianAmount,
        MIN(TransTotal) AS MinAmount,
        MAX(TransTotal) AS MaxAmount
    FROM T1
    GROUP BY TransNumItems
    ORDER BY TransNumItems
""")

print("Task 2.2 Result:")
result_2_2.show()

# Task 2.3: Group by customer ID for young customers
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW T3 AS
    SELECT
        P.CustID,
        C.Age,
        SUM(P.TransNumItems) AS TotalItems,
        SUM(P.TransTotal) AS TotalAmountSpent
    FROM T1 P
    JOIN Customers C ON P.CustID = C.ID
    WHERE C.Age BETWEEN 18 AND 25
    GROUP BY P.CustID, C.Age
""")

print("Task 2.3 completed: T3 created")

# Task 2.4: Compare customer pairs
result_2_4 = spark.sql("""
    SELECT
        C1.CustID AS C1ID,
        C2.CustID AS C2ID,
        C1.Age AS Age1,
        C2.Age AS Age2,
        C1.TotalAmountSpent AS TotalAmount1,
        C2.TotalAmountSpent AS TotalAmount2,
        C1.TotalItems AS TotalItemCount1,
        C2.TotalItems AS TotalItemCount2
    FROM T3 C1
    JOIN T3 C2 ON C1.CustID < C2.CustID
    WHERE C1.Age < C2.Age
        AND C1.TotalAmountSpent > C2.TotalAmountSpent
        AND C1.TotalItems < C2.TotalItems
""")

print("Task 2.4 Result:")
result_2_4.show()

# Clean up
spark.catalog.dropTempView("T1")
spark.catalog.dropTempView("T3")

spark.stop()

Data loaded successfully.
Task 2.1 completed: T1 created
Task 2.2 Result:


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, expr, col, sum, count, min, max, percentile_approx
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer, Imputer
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import random
import string

# Initialize Spark Session
spark = SparkSession.builder.appName("PurchaseTransactions").getOrCreate()

# Step 1: Data Creation

def generate_random_string(length):
    return ''.join(random.choices(string.ascii_letters, k=length))

# Create Customers dataset
customers_data = [(i,
                   generate_random_string(random.randint(10, 20)),
                   random.randint(18, 100),
                   random.randint(1, 500),
                   random.uniform(100, 10000000))
                  for i in range(1, 50001)]

customers_df = spark.createDataFrame(customers_data, ["ID", "Name", "Age", "CountryCode", "Salary"])

# Create Purchases dataset
purchases_data = [(i,
                   random.randint(1, 50000),
                   random.uniform(10, 2000),
                   random.randint(1, 15),
                   generate_random_string(random.randint(20, 50)))
                  for i in range(1, 5000001)]

purchases_df = spark.createDataFrame(purchases_data, ["TransID", "CustID", "TransTotal", "TransNumItems", "TransDesc"])

# Task 2.0: Load data into storage
customers_df.createOrReplaceTempView("Customers")
purchases_df.createOrReplaceTempView("Purchases")

print("Data loaded successfully.")

# Task 2.1: Filter out purchases above $600
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW T1 AS
    SELECT * FROM Purchases
    WHERE TransTotal <= 600
""")

print("Task 2.1 completed: T1 created")

# Task 2.2: Group by Number of Items and calculate statistics
result_2_2 = spark.sql("""
    SELECT
        TransNumItems,
        percentile_approx(TransTotal, 0.5) AS MedianAmount,
        MIN(TransTotal) AS MinAmount,
        MAX(TransTotal) AS MaxAmount
    FROM T1
    GROUP BY TransNumItems
    ORDER BY TransNumItems
""")

print("Task 2.2 Result:")
result_2_2.show()

# Task 2.3: Group by customer ID for young customers
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW T3 AS
    SELECT
        P.CustID,
        C.Age,
        SUM(P.TransNumItems) AS TotalItems,
        SUM(P.TransTotal) AS TotalAmountSpent
    FROM T1 P
    JOIN Customers C ON P.CustID = C.ID
    WHERE C.Age BETWEEN 18 AND 25
    GROUP BY P.CustID, C.Age
""")

print("Task 2.3 completed: T3 created")

# Task 2.4: Compare customer pairs
result_2_4 = spark.sql("""
    SELECT
        C1.CustID AS C1ID,
        C2.CustID AS C2ID,
        C1.Age AS Age1,
        C2.Age AS Age2,
        C1.TotalAmountSpent AS TotalAmount1,
        C2.TotalAmountSpent AS TotalAmount2,
        C1.TotalItems AS TotalItemCount1,
        C2.TotalItems AS TotalItemCount2
    FROM T3 C1
    JOIN T3 C2 ON C1.CustID < C2.CustID
    WHERE C1.Age < C2.Age
        AND C1.TotalAmountSpent > C2.TotalAmountSpent
        AND C1.TotalItems < C2.TotalItems
""")

print("Task 2.4 Result:")
result_2_4.show()