<a href="https://colab.research.google.com/github/Avinash242624/project2/blob/main/Project2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install boto3


Collecting boto3
  Downloading boto3-1.35.76-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.36.0,>=1.35.76 (from boto3)
  Downloading botocore-1.35.76-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Downloading s3transfer-0.10.4-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.35.76-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.35.76-py3-none-any.whl (13.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.2/13.2 MB[0m [31m34.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.10.4-py3-none-any.whl (83 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m83.2/83.2 kB[0m [31m7.4 MB/s[0m eta [36m0:0

In [None]:
import os
import json
import boto3
import pandas as pd
from botocore import UNSIGNED
from botocore.client import Config

def download_and_convert_ndjson_to_csv(bucket_name, local_folder):
    # Create an S3 client with anonymous access
    s3_client = boto3.client('s3', region_name='us-east-1', config=Config(signature_version=UNSIGNED))

    # List objects in the S3 bucket
    response = s3_client.list_objects_v2(Bucket=bucket_name)

    if 'Contents' not in response:
        print("No files found in the bucket.")
        return

    # Process each JSON file directly from S3 and convert to CSV
    for obj in response['Contents']:
        file_key = obj['Key']

        # Check if the file is a JSON file
        if not file_key.endswith('.json'):
            continue

        print(f"Processing {file_key}...")

        # Download the NDJSON content directly from S3
        json_content = s3_client.get_object(Bucket=bucket_name, Key=file_key)['Body'].read().decode('utf-8')

        # Convert NDJSON content to CSV
        convert_ndjson_content_to_csv(json_content, file_key, local_folder)

def convert_ndjson_content_to_csv(json_content, file_key, local_folder):
    """Convert NDJSON content to CSV format and save it."""
    try:
        # Split content by lines (each line is a JSON object)
        lines = json_content.splitlines()

        # Convert each JSON line to a dictionary and store in a list
        data = [json.loads(line) for line in lines if line.strip()]

        # Convert the list of dictionaries to a DataFrame
        df = pd.DataFrame(data)

        # Create the local CSV file path
        csv_file_path = os.path.join(local_folder, file_key.replace('.json', '.csv'))
        os.makedirs(os.path.dirname(csv_file_path), exist_ok=True)

        # Save the DataFrame to a CSV file
        df.to_csv(csv_file_path, index=False, encoding='utf-8')
        print(f"Converted to CSV: {csv_file_path}")

    except json.JSONDecodeError as e:
        print(f"JSON decoding error in {file_key}: {e}")
    except Exception as e:
        print(f"Error converting {file_key} to CSV: {e}")

if __name__ == "__main__":
    bucket_name = 'helpful-sentences-from-reviews'
    local_folder = './customer_review_dataset'

    if not os.path.exists(local_folder):
        os.makedirs(local_folder)

    download_and_convert_ndjson_to_csv(bucket_name, local_folder)
    print("All files converted to CSV and saved successfully!")

Processing test.json...
Converted to CSV: ./customer_review_dataset/test.csv
Processing train.json...
Converted to CSV: ./customer_review_dataset/train.csv
All files converted to CSV and saved successfully!


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
import os

# Verify file paths
sales_data_path = "/content/input/train.csv"
reviews_data_path = "/content/input/Online Retail.csv"

# Define output paths
output_sales_data_path = "/content/output/sales_data_clean.csv"
output_reviews_data_path = "/content/output/review_data_clean.csv"

# Check if input files exist
if not os.path.exists(sales_data_path):
    print(f"Error: Sales data file not found at {sales_data_path}")
    exit(1)

if not os.path.exists(reviews_data_path):
    print(f"Error: Reviews data file not found at {reviews_data_path}")
    exit(1)

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Retail Sales and Demand Forecasting") \
    .getOrCreate()

# Load the datasets from CSV files
sales_df = spark.read.csv(sales_data_path, header=True, inferSchema=True)
reviews_df = spark.read.csv(reviews_data_path, header=True, inferSchema=True)

# Print the schema to understand data types
print("Sales Data Schema:")
sales_df.printSchema()
print("Reviews Data Schema:")
reviews_df.printSchema()

# Count the number of missing values in each column
print("Missing Values in Sales Data:")
for column in sales_df.columns:
    missing_count = sales_df.filter(sales_df[column].isNull()).count()
    if missing_count > 0:
        print(f"{column}: {missing_count}")

print("Missing Values in Reviews Data:")
for column in reviews_df.columns:
    missing_count = reviews_df.filter(reviews_df[column].isNull()).count()
    if missing_count > 0:
        print(f"{column}: {missing_count}")

# Data Cleansing
# Drop duplicates
sales_df = sales_df.dropDuplicates()
reviews_df = reviews_df.dropDuplicates()

# Handle missing values and parse dates if 'date' column is present in sales data
if 'date' in sales_df.columns:
    sales_df = sales_df.na.drop(subset=["date"])  # Drops rows where 'date' is null
    sales_df = sales_df.withColumn("date", to_date(sales_df["date"], "yyyy-MM-dd"))  # Convert date format
else:
    print("No 'date' column found in sales data")

# Drop rows with missing review data
reviews_df = reviews_df.na.drop()

# Create output directory if it doesn't exist
os.makedirs(os.path.dirname(output_sales_data_path), exist_ok=True)
os.makedirs(os.path.dirname(output_reviews_data_path), exist_ok=True)

# Output the cleaned data to CSV files
sales_df.write.csv(output_sales_data_path, header=True, mode="overwrite")
reviews_df.write.csv(output_reviews_data_path, header=True, mode="overwrite")

# Print confirmation of saved files
print(f"Cleaned sales data saved to {output_sales_data_path}")
print(f"Cleaned reviews data saved to {output_reviews_data_path}")

# Stop the Spark session
spark.stop()


Sales Data Schema:
root
 |-- asin: string (nullable = true)
 |-- sentence: string (nullable = true)
 |-- helpful: string (nullable = true)
 |-- main_image_url: string (nullable = true)
 |-- product_title: string (nullable = true)

Reviews Data Schema:
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

Missing Values in Sales Data:
main_image_url: 3
product_title: 4
Missing Values in Reviews Data:
Description: 1454
CustomerID: 135080
No 'date' column found in sales data
Cleaned sales data saved to /content/output/sales_data_clean.csv
Cleaned reviews data saved to /content/output/review_data_clean.csv


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, month, to_date
import os
import shutil

# Initialize SparkSession with legacy time parser policy
spark = SparkSession.builder \
    .appName("Sales Data Aggregation and Feature Engineering") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Define file paths
online_retail_data_path = "/content/input/Online Retail.csv"

# Define output path
output_base_path = "/content/output/task2"

# Resolve path conflicts
if os.path.exists(output_base_path) and not os.path.isdir(output_base_path):
    print(f"Conflict detected: {output_base_path} is a file. Deleting it.")
    os.remove(output_base_path)
if os.path.exists(output_base_path):
    shutil.rmtree(output_base_path)
os.makedirs(output_base_path)

# Load the dataset
online_retail_df = spark.read.csv(online_retail_data_path, header=True, inferSchema=True)

# Convert date column
online_retail_df = online_retail_df.withColumn("InvoiceDate", to_date(col("InvoiceDate"), "MM/dd/yyyy"))

# Sales Aggregation: Total sales per product per month
output_total_sales = os.path.join(output_base_path, "total_sales_per_product_per_month")
os.makedirs(output_total_sales, exist_ok=True)
total_sales_per_product_per_month = online_retail_df.groupBy(
    "StockCode", month("InvoiceDate").alias("month")
).agg(sum("Quantity").alias("total_sales"))
total_sales_per_product_per_month.write.csv(output_total_sales, header=True, mode="overwrite")

# Sales Aggregation: Average revenue per customer
output_avg_revenue = os.path.join(output_base_path, "average_revenue_per_customer")
os.makedirs(output_avg_revenue, exist_ok=True)
average_revenue_per_customer = online_retail_df.groupBy("CustomerID").agg(avg("UnitPrice").alias("average_revenue"))
average_revenue_per_customer.write.csv(output_avg_revenue, header=True, mode="overwrite")

# Seasonal patterns for top-selling products
output_top_selling = os.path.join(output_base_path, "top_selling_products")
os.makedirs(output_top_selling, exist_ok=True)
top_selling_products = online_retail_df.groupBy("StockCode").agg(sum("Quantity").alias("total_sales")).orderBy(
    col("total_sales").desc()
)
top_selling_products.write.csv(output_top_selling, header=True, mode="overwrite")

# Customer lifetime value
output_lifetime_value = os.path.join(output_base_path, "customer_lifetime_value")
os.makedirs(output_lifetime_value, exist_ok=True)
customer_lifetime_value = online_retail_df.groupBy("CustomerID").agg(sum("Quantity").alias("lifetime_value"))
customer_lifetime_value.write.csv(output_lifetime_value, header=True, mode="overwrite")

# Product popularity score
output_popularity = os.path.join(output_base_path, "product_popularity_score")
os.makedirs(output_popularity, exist_ok=True)
product_popularity_score = online_retail_df.groupBy("StockCode").count().withColumnRenamed("count", "popularity_score")
product_popularity_score.write.csv(output_popularity, header=True, mode="overwrite")

# Seasonal trends
output_seasonal_trends = os.path.join(output_base_path, "seasonal_trends")
os.makedirs(output_seasonal_trends, exist_ok=True)
seasonal_trends = online_retail_df.groupBy(
    "StockCode", month("InvoiceDate").alias("month")
).agg(sum("Quantity").alias("monthly_sales"))
seasonal_trends.write.csv(output_seasonal_trends, header=True, mode="overwrite")

# Stop Spark session
spark.stop()


Conflict detected: /content/output/task2 is a file. Deleting it.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, year, month, sum
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import os

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Retail Sales Demand Forecasting") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Load Data
data = spark.read.csv('/content/input/Online Retail.csv', header=True, inferSchema=True)

# Data Preprocessing
data = data.withColumn("InvoiceDate", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
data = data.dropna(how='any')

# Feature Engineering
data = data.withColumn('Year', year(col('InvoiceDate')))
data = data.withColumn('Month', month(col('InvoiceDate')))
data = data.groupBy('Year', 'Month', 'StockCode').agg(sum('Quantity').alias('TotalQuantity'))

# Reduce Dataset Size for Testing (Optional)
data = data.limit(10000)

# Assemble Features
assembler = VectorAssembler(inputCols=['Year', 'Month'], outputCol='features')

# Feature Scaling
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Model Building
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='TotalQuantity')

# Pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Simplified Hyperparameter Tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1]) \
    .addGrid(lr.elasticNetParam, [0.5]) \
    .build()

evaluator = RegressionEvaluator(labelCol="TotalQuantity")
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)  # Reduced folds for faster processing

# Fit Model with Logs
print("Starting model training with cross-validation...")
cvModel = crossval.fit(data)
print("Model training completed.")

# Predict and Evaluate
predictions = cvModel.transform(data)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})

# Ensure Output Directory Exists
task3_output_dir = '/content/output/task3'
if os.path.exists(task3_output_dir):
    if not os.path.isdir(task3_output_dir):
        os.remove(task3_output_dir)  # Remove if it's a file
os.makedirs(task3_output_dir, exist_ok=True)

# Writing RMSE to a File
rmse_file_path = os.path.join(task3_output_dir, 'rmse.txt')
with open(rmse_file_path, 'w') as f:
    f.write(f"Root Mean Square Error (RMSE) on test data = {rmse}\n")

# Writing MAE to a File
mae_file_path = os.path.join(task3_output_dir, 'mae.txt')
with open(mae_file_path, 'w') as f:
    f.write(f"Mean Absolute Error (MAE) on test data = {mae}\n")

# Stop Spark Session
spark.stop()

print(f"RMSE and MAE written to files:\n{rmse_file_path}\n{mae_file_path}")


Starting model training with cross-validation...
Model training completed.
RMSE and MAE written to files:
/content/output/task3/rmse.txt
/content/output/task3/mae.txt


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType
import os

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Sentiment Analysis") \
    .getOrCreate()

# Define a UDF to concatenate arrays
concat_arrays_udf = udf(lambda x, y: x + y, ArrayType(StringType()))

# Load data
data_path = "/content/input/train.csv"  # Update the path to your CSV file
data = spark.read.csv(data_path, header=True, inferSchema=True).select("sentence", "main_image_url")

# Rename column for clarity
data = data.withColumnRenamed("main_image_url", "product_id")

# Text Preprocessing
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
ngram = NGram(n=2, inputCol="filtered", outputCol="ngrams")

# Transform the data through the initial stages
pipeline = Pipeline(stages=[tokenizer, remover, ngram])
transformed_data = pipeline.fit(data).transform(data)

# Combine the NGram and filtered columns into a single column for feature engineering
transformed_data = transformed_data.withColumn("rawFeatures", concat_arrays_udf(col("filtered"), col("ngrams")))

# Feature Engineering using HashingTF and IDF
hashingTF = HashingTF(inputCol="rawFeatures", outputCol="features", numFeatures=10000)
featurized_data = hashingTF.transform(transformed_data)

idf = IDF(inputCol="features", outputCol="finalFeatures")
final_data = idf.fit(featurized_data).transform(featurized_data)

# Prepare labels (assuming 'sentence' is not null when labeled)
final_data = final_data.withColumn("label", col("sentence").isNotNull().cast("int"))

# Split data into training and test sets
(trainingData, testData) = final_data.randomSplit([0.7, 0.3])

# Logistic Regression Model
lr = LogisticRegression(featuresCol='finalFeatures', labelCol='label')
final_model = lr.fit(trainingData)

# Predictions
predictions = final_model.transform(testData)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

# Ensure the task4 output directory exists
task4_dir = "/content/output/task4"
if os.path.exists(task4_dir):
    if not os.path.isdir(task4_dir):
        os.remove(task4_dir)  # Remove if it’s a file
os.makedirs(task4_dir, exist_ok=True)

# Write product sentiments to a subdirectory
product_sentiments_path = os.path.join(task4_dir, "product_sentiments")
product_sentiment = predictions.groupBy("product_id").agg({"prediction": "avg"}).withColumnRenamed("avg(prediction)", "avg_sentiment")
product_sentiment.write.csv(product_sentiments_path, header=True, mode="overwrite")

# Save evaluation results in a text file
results_file_path = os.path.join(task4_dir, "evaluation_results.txt")
with open(results_file_path, "w") as file:
    file.write(f"Area Under ROC: {auc}\n")

# Stop Spark session
spark.stop()

print(f"Results saved in {task4_dir}")


Results saved in /content/output/task4
